This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_and_reserve305
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve305 by this 
push:
     new 9bf99e07ebf streaming window function
9bf99e07ebf is described below

commit 9bf99e07ebf9ff66fdd90d2fa0a92c7035a33d74
Author: zhangstar333 <zhangs...@selectdb.com>
AuthorDate: Sun Apr 20 08:21:07 2025 +0800

    streaming window function
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/pipeline/dependency.h                       |  34 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    | 888 +++++++++++++++++----
 be/src/pipeline/exec/analytic_sink_operator.h      | 171 +++-
 be/src/pipeline/exec/analytic_source_operator.cpp  | 594 +-------------
 be/src/pipeline/exec/analytic_source_operator.h    | 102 +--
 .../vec/aggregate_functions/aggregate_function.h   |  13 +
 .../aggregate_function_window.h                    |  91 ++-
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 .../window_functions/test_column_boundary.out      | Bin 0 -> 119 bytes
 .../window_functions/test_column_boundary.groovy   |  54 ++
 12 files changed, 1063 insertions(+), 888 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1359ae51080..ca510308fa6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -168,6 +168,7 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
 DEFINE_mBool(disable_memory_gc, "false");
 
 DEFINE_mBool(enable_stacktrace, "true");
+DEFINE_mBool(enable_streaming_analytic, "true");
 
 DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 60a718d1e70..f5af314fa3a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -207,6 +207,7 @@ DECLARE_mBool(disable_memory_gc);
 
 // if false, turn off all stacktrace
 DECLARE_mBool(enable_stacktrace);
+DECLARE_mBool(enable_streaming_analytic);
 
 // when alloc memory larger than stacktrace_in_alloc_large_memory_bytes, 
default 2G,
 // if alloc successful, will print a warning with stacktrace, but not prevent 
memory alloc.
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e010ad7e6cd..a731d42c5b8 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -573,41 +573,15 @@ struct MultiCastSharedState : public BasicSharedState,
     void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
 };
 
-struct BlockRowPos {
-    int64_t block_num {}; //the pos at which block
-    int64_t row_num {};   //the pos at which row
-    int64_t pos {};       //pos = all blocks size + row_num
-    std::string debug_string() const {
-        std::string res = "\t block_num: ";
-        res += std::to_string(block_num);
-        res += "\t row_num: ";
-        res += std::to_string(row_num);
-        res += "\t pos: ";
-        res += std::to_string(pos);
-        return res;
-    }
-};
-
 struct AnalyticSharedState : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(AnalyticSharedState)
 
 public:
     AnalyticSharedState() = default;
-
-    int64_t current_row_position = 0;
-    BlockRowPos partition_by_end;
-    int64_t input_total_rows = 0;
-    BlockRowPos all_block_end;
-    std::vector<vectorized::Block> input_blocks;
-    bool input_eos = false;
-    BlockRowPos found_partition_end;
-    std::vector<int64_t> origin_cols;
-    std::vector<int64_t> input_block_first_row_positions;
-    std::vector<std::vector<vectorized::MutableColumnPtr>> agg_input_columns;
-
-    // TODO: maybe global?
-    std::vector<int64_t> partition_by_column_idxs;
-    std::vector<int64_t> ordey_by_column_idxs;
+    std::queue<vectorized::Block> blocks_buffer;
+    std::mutex buffer_mutex;
+    bool sink_eos = false;
+    std::mutex sink_eos_lock;
 };
 
 struct JoinSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index b3d1ceaa8ff..16382457f96 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -18,6 +18,10 @@
 
 #include "analytic_sink_operator.h"
 
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <cstdint>
 #include <string>
 
 #include "pipeline/exec/operator.h"
@@ -31,10 +35,77 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& inf
     RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
-    _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
+    _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");
     _compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime");
     _compute_partition_by_timer = ADD_TIMER(profile(), 
"ComputePartitionByTime");
     _compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime");
+    _compute_range_between_function_timer = ADD_TIMER(profile(), 
"ComputeRangeBetweenTime");
+    _partition_search_timer = ADD_TIMER(profile(), "PartitionSearchTime");
+    _order_search_timer = ADD_TIMER(profile(), "OrderSearchTime");
+    _remove_rows_timer = ADD_TIMER(profile(), "RemoveRowsTime");
+    _remove_rows = ADD_COUNTER(profile(), "RemoveRows", TUnit::UNIT);
+    _remove_count = ADD_COUNTER(profile(), "RemoveCount", TUnit::UNIT);
+    _blocks_memory_usage =
+            profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, 
"MemoryUsage", 1);
+    _agg_arena_pool = std::make_unique<vectorized::Arena>();
+    auto& p = _parent->cast<AnalyticSinkOperatorX>();
+    if (!p._has_window) { //haven't set window, Unbounded:  [unbounded 
preceding,unbounded following]
+        // For window frame `ROWS|RANGE BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING`
+        _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_partition;
+    } else if (p._has_range_window) {
+        if (!p._has_window_start && !p._has_window_end) {
+            _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_partition;
+        } else {
+            if (!p._has_window_start &&
+                p._window.window_end.type == 
TAnalyticWindowBoundaryType::CURRENT_ROW) {
+                // For window frame `RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW`
+                _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_unbounded_range;
+                _streaming_mode = config::enable_streaming_analytic;
+            } else {
+                _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_range_between;
+            }
+        }
+    } else {
+        // haven't set start and end, same as PARTITION
+        if (!p._has_window_start && !p._has_window_end) {
+            _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_partition;
+        } else {
+            if (!p._has_window_start &&
+                p._window.window_end.type == 
TAnalyticWindowBoundaryType::CURRENT_ROW) {
+                _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_unbounded_rows;
+                _streaming_mode = true;
+            } else {
+                _executor.get_next_impl = 
&AnalyticSinkLocalState::_get_next_for_sliding_rows;
+            }
+        }
+
+        if (p._has_window_start) { //calculate start boundary
+            TAnalyticWindowBoundary b = p._window.window_start;
+            if (b.__isset.rows_offset_value) { //[offset     ,   ]
+                _rows_start_offset = b.rows_offset_value;
+                if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+                    _rows_start_offset *= -1;                                
//preceding--> negative
+                }                                                            
//current_row  0
+            } else {                                                         
//following    positive
+                DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); 
//[current row,   ]
+                _rows_start_offset = 0;
+            }
+        }
+
+        if (p._has_window_end) { //calculate end boundary
+            TAnalyticWindowBoundary b = p._window.window_end;
+            if (b.__isset.rows_offset_value) { //[       , offset]
+                _rows_end_offset = b.rows_offset_value;
+                if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
+                    _rows_end_offset *= -1;
+                }
+            } else {
+                DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); 
//[   ,current row]
+                _rows_end_offset = 0;
+            }
+        }
+    }
+    profile()->add_info_string("streaming mode: ", 
std::to_string(_streaming_mode));
     return Status::OK();
 }
 
@@ -43,159 +114,502 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) 
{
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<AnalyticSinkOperatorX>();
-    
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
-    
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
-
-    size_t agg_size = p._agg_expr_ctxs.size();
-    _agg_expr_ctxs.resize(agg_size);
-    _shared_state->agg_input_columns.resize(agg_size);
-    for (int i = 0; i < agg_size; ++i) {
-        _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]);
+
+    _agg_functions_size = p._agg_functions_size;
+    _agg_expr_ctxs.resize(_agg_functions_size);
+    _agg_functions.resize(_agg_functions_size);
+    _agg_input_columns.resize(_agg_functions_size);
+    _offsets_of_aggregate_states.resize(_agg_functions_size);
+    _result_column_nullable_flags.resize(_agg_functions_size);
+    _result_column_could_resize.resize(_agg_functions_size);
+
+    for (int i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
+        _agg_input_columns[i].resize(p._num_agg_input[i]);
         _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size());
         for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) {
             RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, 
_agg_expr_ctxs[i][j]));
+            _agg_input_columns[i][j] = 
_agg_expr_ctxs[i][j]->root()->data_type()->create_column();
         }
-
-        for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
-            _shared_state->agg_input_columns[i][j] =
-                    _agg_expr_ctxs[i][j]->root()->data_type()->create_column();
+        _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
+        _result_column_nullable_flags[i] =
+                
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
+                _agg_functions[i]->data_type()->is_nullable();
+        _result_column_could_resize[i] =
+                _agg_functions[i]->function()->result_column_could_resize();
+        if 
(PARTITION_FUNCTION_SET.contains(_agg_functions[i]->function()->get_name())) {
+            _streaming_mode = false;
         }
     }
-    _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) {
+
+    _partition_exprs_size = p._partition_by_eq_expr_ctxs.size();
+    _partition_by_eq_expr_ctxs.resize(_partition_exprs_size);
+    _partition_by_columns.resize(_partition_exprs_size);
+    for (size_t i = 0; i < _partition_exprs_size; i++) {
         RETURN_IF_ERROR(
                 p._partition_by_eq_expr_ctxs[i]->clone(state, 
_partition_by_eq_expr_ctxs[i]));
+        _partition_by_columns[i] =
+                
_partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
-    _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size());
-    for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) {
+
+    _order_by_exprs_size = p._order_by_eq_expr_ctxs.size();
+    _order_by_eq_expr_ctxs.resize(_order_by_exprs_size);
+    _order_by_columns.resize(_order_by_exprs_size);
+    for (size_t i = 0; i < _order_by_exprs_size; i++) {
         RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, 
_order_by_eq_expr_ctxs[i]));
+        _order_by_columns[i] = 
_order_by_eq_expr_ctxs[i]->root()->data_type()->create_column();
     }
+
+    // only support one order by column, so need two columns upper and lower 
bound
+    _range_result_columns.resize(_range_between_expr_ctxs.size());
+    _range_between_expr_ctxs = p._range_between_expr_ctxs;
+    for (size_t i = 0; i < _range_between_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._range_between_expr_ctxs[i]->clone(state, 
_range_between_expr_ctxs[i]));
+        _range_result_columns[i] =
+                
_range_between_expr_ctxs[i]->root()->data_type()->create_column();
+    }
+
+    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
+                                                   p._align_aggregate_states);
+    _create_agg_status();
     return Status::OK();
 }
 
-bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& 
found_partition_end) {
-    auto& shared_state = *_shared_state;
-    if (shared_state.input_eos ||
-        (shared_state.current_row_position <
-         shared_state.partition_by_end.pos)) { //now still have partition data
-        return false;
+Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_close_timer);
+    if (_closed) {
+        return Status::OK();
+    }
+
+    _destroy_agg_status();
+    _agg_arena_pool = nullptr;
+    _fn_place_ptr = nullptr;
+    _result_window_columns.clear();
+    _agg_input_columns.clear();
+    _partition_by_columns.clear();
+    _order_by_columns.clear();
+    _range_result_columns.clear();
+    return PipelineXSinkLocalState<AnalyticSharedState>::close(state, 
exec_status);
+}
+
+bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows,
+                                                        int64_t 
current_block_base_pos) {
+    while (_current_row_position < _partition_by_pose.end) {
+        int64_t current_row_start = 0;
+        int64_t current_row_end = _current_row_position + _rows_end_offset + 1;
+
+        _reset_agg_status();
+        if 
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
+            current_row_start = _partition_by_pose.start;
+        } else {
+            current_row_start = _current_row_position + _rows_start_offset;
+        }
+        // Eg: rows between unbounded preceding and 10 preceding
+        // Make sure range_start <= range_end
+        current_row_start = std::min(current_row_start, current_row_end);
+        _execute_for_function(_partition_by_pose.start, 
_partition_by_pose.end, current_row_start,
+                              current_row_end);
+        int64_t pos = current_pos_in_block();
+        _insert_result_info(pos, pos + 1);
+        _current_row_position++;
+        if (_current_row_position - current_block_base_pos >= batch_rows) {
+            return true;
+        }
     }
-    if ((_partition_by_eq_expr_ctxs.empty() && !shared_state.input_eos) ||
-        (found_partition_end.pos == 0)) { //no partition, get until fetch to 
EOS
-        return true;
+    return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows,
+                                                          int64_t 
current_block_base_pos) {
+    while (_current_row_position < _partition_by_pose.end) {
+        // [preceding, current_row], [current_row, following] rewrite it's same
+        // as could reuse the previous calculate result, so don't call 
_reset_agg_status function
+        // going on calculate, add up data, no need to reset state
+        _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+                              _current_row_position, _current_row_position + 
1);
+        int64_t pos = current_pos_in_block();
+        _insert_result_info(pos, pos + 1);
+        _current_row_position++;
+        if (_current_row_position - current_block_base_pos >= batch_rows) {
+            return true;
+        }
+    }
+    return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_partition(int64_t batch_rows,
+                                                     int64_t 
current_block_base_pos) {
+    if (_current_row_position == _partition_by_pose.start) {
+        _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+                              _partition_by_pose.start, 
_partition_by_pose.end);
+    }
+
+    // the end pos maybe after multis blocks, but should output by batch size 
and should not exceed partition end
+    auto window_end_pos = _current_row_position + batch_rows;
+    window_end_pos = std::min<int64_t>(window_end_pos, _partition_by_pose.end);
+
+    auto previous_window_frame_width = _current_row_position - 
current_block_base_pos;
+    auto current_window_frame_width = window_end_pos - current_block_base_pos;
+    // should not exceed block batch size
+    current_window_frame_width = std::min<int64_t>(current_window_frame_width, 
batch_rows);
+    auto real_deal_with_width = current_window_frame_width - 
previous_window_frame_width;
+    int64_t pos = current_pos_in_block();
+    _insert_result_info(pos, pos + real_deal_with_width);
+    _current_row_position += real_deal_with_width;
+    return _current_row_position - current_block_base_pos >= batch_rows;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t batch_rows,
+                                                           int64_t 
current_block_base_pos) {
+    while (_current_row_position < _partition_by_pose.end) {
+        _update_order_by_range();
+        if (_current_row_position == _order_by_pose.start) {
+            _execute_for_function(_partition_by_pose.start, 
_partition_by_pose.end,
+                                  _order_by_pose.start, _order_by_pose.end);
+        }
+        auto previous_window_frame_width = _current_row_position - 
current_block_base_pos;
+        auto current_window_frame_width = _order_by_pose.end - 
current_block_base_pos;
+        current_window_frame_width = 
std::min<int64_t>(current_window_frame_width, batch_rows);
+        auto real_deal_with_width = current_window_frame_width - 
previous_window_frame_width;
+        int64_t pos = current_pos_in_block();
+        _insert_result_info(pos, pos + real_deal_with_width);
+        _current_row_position += real_deal_with_width;
+        if (_current_row_position - current_block_base_pos >= batch_rows) {
+            return true;
+        }
+    }
+    return false;
+}
+
+bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t batch_rows,
+                                                         int64_t 
current_block_base_pos) {
+    while (_current_row_position < _partition_by_pose.end) {
+        _reset_agg_status();
+        if 
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_start) {
+            _order_by_pose.start = _partition_by_pose.start;
+        } else {
+            _order_by_pose.start = find_first_not_equal(
+                    _range_result_columns[0].get(), _order_by_columns[0].get(),
+                    _current_row_position, _order_by_pose.start, 
_partition_by_pose.end);
+        }
+
+        if 
(!_parent->cast<AnalyticSinkOperatorX>()._window.__isset.window_end) {
+            _order_by_pose.end = _partition_by_pose.end;
+        } else {
+            _order_by_pose.end = find_first_not_equal(
+                    _range_result_columns[1].get(), _order_by_columns[0].get(),
+                    _current_row_position, _order_by_pose.end, 
_partition_by_pose.end);
+        }
+        _execute_for_function(_partition_by_pose.start, _partition_by_pose.end,
+                              _order_by_pose.start, _order_by_pose.end);
+        int64_t pos = current_pos_in_block();
+        _insert_result_info(pos, pos + 1);
+        _current_row_position++;
+        if (_current_row_position - current_block_base_pos >= batch_rows) {
+            return true;
+        }
     }
-    if (!_partition_by_eq_expr_ctxs.empty() &&
-        found_partition_end.pos == shared_state.all_block_end.pos &&
-        !shared_state.input_eos) { //current partition data calculate done
-        return true;
+    if (_current_row_position == _partition_by_pose.end) {
+        _order_by_pose.start = _partition_by_pose.end; // update to next 
partition pos
+        _order_by_pose.end = _partition_by_pose.end;
     }
     return false;
 }
 
-//_partition_by_columns,_order_by_columns save in blocks, so if need to 
calculate the boundary, may find in which blocks firstly
-BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int64_t idx, 
BlockRowPos start,
-                                                             BlockRowPos end,
-                                                             bool 
need_check_first) {
-    auto& shared_state = *_shared_state;
-    int64_t start_init_row_num = start.row_num;
-    vectorized::ColumnPtr start_column =
-            
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
-    vectorized::ColumnPtr start_next_block_column = start_column;
-
-    DCHECK_LE(start.block_num, end.block_num);
-    DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
-    int64_t start_block_num = start.block_num;
-    int64_t end_block_num = end.block_num;
-    int64_t mid_blcok_num = end.block_num;
-    // To fix this problem: https://github.com/apache/doris/issues/15951
-    // in this case, the partition by column is last row of block, so it's 
pointed to a new block at row = 0, range is: [left, right)
-    // From the perspective of order by column, the two values are exactly 
equal.
-    // so the range will be get wrong because it's compare_at == 0 with next 
block at row = 0
-    if (need_check_first && end.block_num > 0 && end.row_num == 0) {
-        end.block_num--;
-        end_block_num--;
-        end.row_num = shared_state.input_blocks[end_block_num].rows();
-    }
-    //binary search find in which block
-    while (start_block_num < end_block_num) {
-        mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
-        start_next_block_column =
-                
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
-        //Compares (*this)[n] and rhs[m], this: start[init_row]  rhs: mid[0]
-        if (start_column->compare_at(start_init_row_num, 0, 
*start_next_block_column, 1) == 0) {
-            start_block_num = mid_blcok_num;
+Status AnalyticSinkLocalState::_execute_impl() {
+    while (_output_block_index < _input_blocks.size()) {
+        {
+            _get_partition_by_end();
+            // streaming_mode means no need get all parition data, could 
calculate data when it's arrived
+            if (!_partition_by_pose.is_ended && !_streaming_mode) {
+                break;
+            }
+            _init_result_columns();
+            auto batch_rows = _input_blocks[_output_block_index].rows();
+            auto current_block_base_pos =
+                    _input_block_first_row_positions[_output_block_index] - 
_have_removed_rows;
+            bool should_output = false;
+
+            {
+                SCOPED_TIMER(_evaluation_timer);
+                should_output =
+                        (this->*_executor.get_next_impl)(batch_rows, 
current_block_base_pos);
+            }
+
+            if (should_output) {
+                vectorized::Block block;
+                _output_current_block(&block);
+                _refresh_buffer_and_dependency_state(&block);
+            }
+            if (_current_row_position == _partition_by_pose.end && 
_partition_by_pose.is_ended) {
+                _reset_state_for_next_partition();
+            }
+        }
+    }
+    return Status::OK();
+}
+
+void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, 
int64_t partition_end,
+                                                   int64_t frame_start, 
int64_t frame_end) {
+    // here is the core function, should not add timer
+    // If the end is not greater than the start, the current window should be 
empty.
+    _current_window_empty =
+            std::min(frame_end, partition_end) <= std::max(frame_start, 
partition_start);
+
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        if (_result_column_nullable_flags[i] && _current_window_empty) {
+            continue;
+        }
+        std::vector<const vectorized::IColumn*> agg_columns;
+        for (int j = 0; j < _agg_input_columns[i].size(); ++j) {
+            agg_columns.push_back(_agg_input_columns[i][j].get());
+        }
+        _agg_functions[i]->function()->add_range_single_place(
+                partition_start, partition_end, frame_start, frame_end,
+                _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
+                _agg_arena_pool.get());
+    }
+}
+
+void AnalyticSinkLocalState::_insert_result_info(int64_t start, int64_t end) {
+    // here is the core function, should not add timer
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        if (_result_column_nullable_flags[i]) {
+            if (_current_window_empty) {
+                //TODO need check this logical???
+                _result_window_columns[i]->insert_many_defaults(end - start);
+            } else {
+                auto* dst =
+                        
assert_cast<vectorized::ColumnNullable*>(_result_window_columns[i].get());
+                dst->get_null_map_data().add_num_element(0, 
static_cast<uint32_t>(end - start));
+                _agg_functions[i]->function()->insert_result_into_range(
+                        _fn_place_ptr + _offsets_of_aggregate_states[i], 
dst->get_nested_column(),
+                        start, end);
+            }
         } else {
-            end_block_num = mid_blcok_num - 1;
-        }
-    }
-
-    // have check the start.block_num:  start_column[start_init_row_num] with 
mid_blcok_num start_next_block_column[0]
-    // now next block must not be result, so need check with end_block_num: 
start_next_block_column[last_row]
-    if (end_block_num == mid_blcok_num - 1) {
-        start_next_block_column =
-                
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
-        int64_t block_size = shared_state.input_blocks[end_block_num].rows();
-        if ((start_column->compare_at(start_init_row_num, block_size - 1, 
*start_next_block_column,
-                                      1) == 0)) {
-            start.block_num = end_block_num + 1;
-            start.row_num = 0;
-            return start;
-        }
-    }
-
-    //check whether need get column again, maybe same as first init
-    // if the start_block_num have move to forword, so need update start block 
num and compare it from row_num=0
-    if (start_block_num != start.block_num) {
-        start_init_row_num = 0;
-        start.block_num = start_block_num;
-        start_column = 
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
-    }
-    //binary search, set start and end pos
-    int64_t start_pos = start_init_row_num;
-    int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
-    //if end_block_num haven't moved, only start_block_num go to the end block
-    //so could use the end.row_num for binary search
-    if (start.block_num == end.block_num) {
-        end_pos = end.row_num;
-    }
-    while (start_pos < end_pos) {
-        int64_t mid_pos = (start_pos + end_pos) >> 1;
-        if (start_column->compare_at(start_init_row_num, mid_pos, 
*start_column, 1)) {
-            end_pos = mid_pos;
+            _agg_functions[i]->function()->insert_result_into_range(
+                    _fn_place_ptr + _offsets_of_aggregate_states[i], 
*_result_window_columns[i],
+                    start, end);
+        }
+    }
+}
+
+void AnalyticSinkLocalState::_output_current_block(vectorized::Block* block) {
+    block->swap(std::move(_input_blocks[_output_block_index]));
+    _blocks_memory_usage->add(-block->allocated_bytes());
+    
DCHECK(_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags.size() 
==
+           _result_window_columns.size());
+    for (size_t i = 0; i < _result_window_columns.size(); ++i) {
+        DCHECK(_result_window_columns[i]);
+        DCHECK(_agg_functions[i]);
+        if 
(_parent->cast<AnalyticSinkOperatorX>()._change_to_nullable_flags[i]) {
+            block->insert({make_nullable(std::move(_result_window_columns[i])),
+                           make_nullable(_agg_functions[i]->data_type()), ""});
         } else {
-            start_pos = mid_pos + 1;
+            block->insert(
+                    {std::move(_result_window_columns[i]), 
_agg_functions[i]->data_type(), ""});
+        }
+    }
+
+    _output_block_index++;
+}
+
+void AnalyticSinkLocalState::_init_result_columns() {
+    if (_current_row_position + _have_removed_rows ==
+        _input_block_first_row_positions[_output_block_index]) {
+        _result_window_columns.resize(_agg_functions_size);
+        // return type create result column
+        for (size_t i = 0; i < _agg_functions_size; ++i) {
+            _result_window_columns[i] = 
_agg_functions[i]->data_type()->create_column();
+            if (_result_column_could_resize[i]) {
+                
_result_window_columns[i]->resize(_input_blocks[_output_block_index].rows());
+            } else {
+                
_result_window_columns[i]->reserve(_input_blocks[_output_block_index].rows());
+            }
+        }
+    }
+}
+
+void 
AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(vectorized::Block* 
block) {
+    size_t buffer_size = 0;
+    {
+        std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
+        _shared_state->blocks_buffer.push(std::move(*block));
+        buffer_size = _shared_state->blocks_buffer.size();
+    }
+    if (buffer_size > 128) {
+        // buffer have enough data, could block the sink
+        _dependency->block();
+    }
+    // buffer have push data, could signal the source to read
+    _dependency->set_ready_to_read();
+}
+
+void AnalyticSinkLocalState::_reset_state_for_next_partition() {
+    _partition_column_statistics.update(_partition_by_pose.end - 
_partition_by_pose.start);
+    _order_by_column_statistics.reset();
+    _partition_by_pose.start = _partition_by_pose.end;
+    _current_row_position = _partition_by_pose.start;
+    _reset_agg_status();
+}
+
+void AnalyticSinkLocalState::_update_order_by_range() {
+    // still have more data
+    if (_order_by_pose.is_ended && _current_row_position < _order_by_pose.end) 
{
+        return;
+    }
+    SCOPED_TIMER(_order_search_timer);
+    while (!_next_order_by_ends.empty()) {
+        int64_t peek = _next_order_by_ends.front();
+        _next_order_by_ends.pop();
+        if (peek > _order_by_pose.end) {
+            _order_by_pose.start = _order_by_pose.end;
+            _order_by_pose.end = peek;
+            _order_by_pose.is_ended = true;
+            _order_by_column_statistics.update(_order_by_pose.end - 
_order_by_pose.start);
+            return;
+        }
+    }
+
+    if (_order_by_pose.is_ended) {
+        _order_by_pose.start = _order_by_pose.end;
+    }
+    _order_by_pose.end = _partition_by_pose.end;
+
+    {
+        if (_order_by_pose.start < _order_by_pose.end) {
+            for (size_t i = 0; i < _order_by_exprs_size; ++i) {
+                _order_by_pose.end = find_first_not_equal(
+                        _order_by_columns[i].get(), _order_by_columns[i].get(),
+                        _order_by_pose.start, _order_by_pose.start, 
_order_by_pose.end);
+            }
+        }
+    }
+
+    if (_order_by_pose.end < _partition_by_pose.end) {
+        _order_by_column_statistics.update(_order_by_pose.end - 
_order_by_pose.start);
+        _order_by_pose.is_ended = true;
+        _find_next_order_by_ends();
+        return;
+    }
+    DCHECK_EQ(_partition_by_pose.end, _order_by_pose.end);
+    if (_partition_by_pose.is_ended) {
+        _order_by_pose.is_ended = true;
+        return;
+    }
+    _order_by_pose.is_ended = false;
+}
+
+void AnalyticSinkLocalState::_get_partition_by_end() {
+    //still have data, return partition_by_end directly
+    if (_partition_by_pose.is_ended && _current_row_position < 
_partition_by_pose.end) {
+        return;
+    }
+    //no partition_by, the all block is end
+    if (_partition_by_eq_expr_ctxs.empty() || (_input_total_rows == 0)) {
+        _partition_by_pose.end = _input_total_rows - _have_removed_rows;
+        _partition_by_pose.is_ended = _input_eos;
+        return;
+    }
+    SCOPED_TIMER(_partition_search_timer);
+    while (!_next_partition_ends.empty()) {
+        int64_t peek = _next_partition_ends.front();
+        _next_partition_ends.pop();
+        if (peek > _partition_by_pose.end) {
+            _partition_by_pose.end = peek;
+            _partition_by_pose.is_ended = true;
+            return;
+        }
+    }
+
+    const auto start = _partition_by_pose.end;
+    const auto target = (_partition_by_pose.is_ended || _partition_by_pose.end 
== 0)
+                                ? _partition_by_pose.end
+                                : _partition_by_pose.end - 1;
+    DCHECK(_partition_exprs_size > 0);
+    const auto partition_column_rows = _partition_by_columns[0]->size();
+    _partition_by_pose.end = partition_column_rows;
+
+    {
+        if (start < _partition_by_pose.end) {
+            for (size_t i = 0; i < _partition_exprs_size; ++i) {
+                _partition_by_pose.end = find_first_not_equal(
+                        _partition_by_columns[i].get(), 
_partition_by_columns[i].get(), target,
+                        start, _partition_by_pose.end);
+            }
+        }
+    }
+
+    if (_partition_by_pose.end < partition_column_rows) {
+        _partition_by_pose.is_ended = true;
+        _find_next_partition_ends();
+        return;
+    }
+
+    DCHECK_EQ(_partition_by_pose.end, partition_column_rows);
+    _partition_by_pose.is_ended = _input_eos;
+}
+
+void AnalyticSinkLocalState::_find_next_partition_ends() {
+    if (!_partition_column_statistics.is_high_cardinality()) {
+        return;
+    }
+
+    SCOPED_TIMER(_partition_search_timer);
+    for (size_t i = _partition_by_pose.end + 1; i < 
_partition_by_columns[0]->size(); ++i) {
+        for (auto& column : _partition_by_columns) {
+            auto cmp = column->compare_at(i - 1, i, *column, 1);
+            if (cmp != 0) {
+                _next_partition_ends.push(i);
+                break;
+            }
         }
     }
-    start.row_num = start_pos; //update row num, return the find end
-    return start;
 }
 
-BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() {
-    auto& shared_state = *_shared_state;
-    if (shared_state.current_row_position <
-        shared_state.partition_by_end.pos) { //still have data, return 
partition_by_end directly
-        return shared_state.partition_by_end;
+void AnalyticSinkLocalState::_find_next_order_by_ends() {
+    if (!_order_by_column_statistics.is_high_cardinality()) {
+        return;
     }
 
-    if (_partition_by_eq_expr_ctxs.empty() ||
-        (shared_state.input_total_rows == 0)) { //no partition_by, the all 
block is end
-        return shared_state.all_block_end;
+    SCOPED_TIMER(_order_search_timer);
+    for (size_t i = _order_by_pose.end + 1; i < _partition_by_pose.end; ++i) {
+        for (auto& column : _order_by_columns) {
+            auto cmp = column->compare_at(i - 1, i, *column, 1);
+            if (cmp != 0) {
+                _next_order_by_ends.push(i);
+                break;
+            }
+        }
     }
+}
 
-    BlockRowPos cal_end = shared_state.all_block_end;
-    for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size();
-         ++i) { //have partition_by, binary search the partiton end
-        cal_end = 
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
-                                           shared_state.partition_by_end, 
cal_end);
+// Compares (*this)[n] and rhs[m]
+int64_t AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn* 
reference_column,
+                                                     vectorized::IColumn* 
compared_column,
+                                                     int64_t target, int64_t 
start, int64_t end) {
+    while (start + 1 < end) {
+        int64_t mid = start + (end - start) / 2;
+        if (reference_column->compare_at(target, mid, *compared_column, 1) == 
0) {
+            start = mid;
+        } else {
+            end = mid;
+        }
+    }
+    if (reference_column->compare_at(target, end - 1, *compared_column, 1) == 
0) {
+        return end;
     }
-    cal_end.pos = 
shared_state.input_block_first_row_positions[cal_end.block_num] + 
cal_end.row_num;
-    return cal_end;
+    return end - 1;
 }
 
 AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
                                              const TPlanNode& tnode, const 
DescriptorTbl& descs,
                                              bool require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id),
+          _pool(pool),
+          _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
+          _output_tuple_id(tnode.analytic_node.output_tuple_id),
           _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
                                      ? tnode.analytic_node.buffered_tuple_id
                                      : 0),
@@ -203,20 +617,33 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* 
pool, int operator_id,
           _require_bucket_distribution(require_bucket_distribution),
           _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
                                    ? tnode.distribute_expr_lists[0]
-                                   : tnode.analytic_node.partition_exprs) {
+                                   : tnode.analytic_node.partition_exprs),
+          _window(tnode.analytic_node.window),
+          _has_window(tnode.analytic_node.__isset.window),
+          _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
+          _has_window_start(tnode.analytic_node.window.__isset.window_start),
+          _has_window_end(tnode.analytic_node.window.__isset.window_end) {
     _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
 }
 
 Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
     const TAnalyticNode& analytic_node = tnode.analytic_node;
-    size_t agg_size = analytic_node.analytic_functions.size();
-    _agg_expr_ctxs.resize(agg_size);
-    _num_agg_input.resize(agg_size);
-    for (int i = 0; i < agg_size; ++i) {
+    _agg_functions_size = analytic_node.analytic_functions.size();
+    _agg_expr_ctxs.resize(_agg_functions_size);
+    _num_agg_input.resize(_agg_functions_size);
+    for (int i = 0; i < _agg_functions_size; ++i) {
         const TExpr& desc = analytic_node.analytic_functions[i];
-        _num_agg_input[i] = desc.nodes[0].num_children;
+        vectorized::AggFnEvaluator* evaluator = nullptr;
+        // Window function treats all NullableAggregateFunction as 
AlwaysNullable.
+        // Its behavior is same with executed without group by key.
+        // https://github.com/apache/doris/pull/40693
+        RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(_pool, desc, {}, 
/*without_key*/ true,
+                                                           &evaluator));
+        _agg_functions.emplace_back(evaluator);
+
         int node_idx = 0;
+        _num_agg_input[i] = desc.nodes[0].num_children;
         for (int j = 0; j < desc.nodes[0].num_children; ++j) {
             ++node_idx;
             vectorized::VExprSPtr expr;
@@ -231,7 +658,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
                                                          
_partition_by_eq_expr_ctxs));
     
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs,
                                                          
_order_by_eq_expr_ctxs));
-    _agg_functions_size = agg_size;
+    
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.range_between_offset_exprs,
+                                                         
_range_between_expr_ctxs));
     return Status::OK();
 }
 
@@ -240,6 +668,18 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
     for (const auto& ctx : _agg_expr_ctxs) {
         RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, 
_child->row_desc()));
     }
+    _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
+    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
+    _change_to_nullable_flags.resize(_agg_functions_size);
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        SlotDescriptor* intermediate_slot_desc = 
_intermediate_tuple_desc->slots()[i];
+        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
+        RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
+                                                   intermediate_slot_desc, 
output_slot_desc));
+        _agg_functions[i]->set_version(state->be_exec_version());
+        _change_to_nullable_flags[i] =
+                output_slot_desc->is_nullable() && 
(!_agg_functions[i]->data_type()->is_nullable());
+    }
     if (!_partition_by_eq_expr_ctxs.empty() || 
!_order_by_eq_expr_ctxs.empty()) {
         vector<TTupleId> tuple_ids;
         tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
@@ -254,11 +694,39 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
                     vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, 
cmp_row_desc));
         }
     }
+    if (!_range_between_expr_ctxs.empty()) {
+        DCHECK(_range_between_expr_ctxs.size() == 2);
+        RETURN_IF_ERROR(
+                vectorized::VExpr::prepare(_range_between_expr_ctxs, state, 
_child->row_desc()));
+    }
+    RETURN_IF_ERROR(vectorized::VExpr::open(_range_between_expr_ctxs, state));
     RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, 
state));
     RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
     for (size_t i = 0; i < _agg_functions_size; ++i) {
+        RETURN_IF_ERROR(_agg_functions[i]->open(state));
         RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state));
     }
+
+    _offsets_of_aggregate_states.resize(_agg_functions_size);
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
+        const auto& agg_function = _agg_functions[i]->function();
+        // aggregate states are aligned based on maximum requirement
+        _align_aggregate_states = std::max(_align_aggregate_states, 
agg_function->align_of_data());
+        _total_size_of_aggregate_states += agg_function->size_of_data();
+        // If not the last aggregate_state, we need pad it so that next 
aggregate_state will be aligned.
+        if (i + 1 < _agg_functions_size) {
+            size_t alignment_of_next_state = _agg_functions[i + 
1]->function()->align_of_data();
+            if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 
0) {
+                return Status::RuntimeError("Logical error: align_of_data is 
not 2^N");
+            }
+            /// Extend total_size to next alignment requirement
+            /// Add padding by rounding up 'total_size_of_aggregate_states' to 
be a multiplier of alignment_of_next_state.
+            _total_size_of_aggregate_states =
+                    (_total_size_of_aggregate_states + alignment_of_next_state 
- 1) /
+                    alignment_of_next_state * alignment_of_next_state;
+        }
+    }
     return Status::OK();
 }
 
@@ -267,80 +735,139 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
-
     local_state._reserve_mem_size = 0;
     SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
+    local_state._input_eos = eos;
+    local_state._remove_unused_rows();
+    RETURN_IF_ERROR(_add_input_block(state, input_block));
+    RETURN_IF_ERROR(local_state._execute_impl());
+    if (local_state._input_eos) {
+        std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+        local_state._shared_state->sink_eos = true;
+        local_state._dependency->set_ready_to_read(); // ready for source to 
read
+    }
+    return Status::OK();
+}
 
-    local_state._shared_state->input_eos = eos;
-    if (local_state._shared_state->input_eos && input_block->rows() == 0) {
-        local_state._dependency->set_ready_to_read();
-        local_state._dependency->block();
+size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
+    auto& local_state = get_local_state(state);
+    return local_state._reserve_mem_size;
+}
+
+Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
+                                               vectorized::Block* input_block) 
{
+    if (input_block->rows() <= 0) {
         return Status::OK();
     }
-
-    local_state._shared_state->input_block_first_row_positions.emplace_back(
-            local_state._shared_state->input_total_rows);
+    auto& local_state = get_local_state(state);
+    
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
     size_t block_rows = input_block->rows();
-    local_state._shared_state->input_total_rows += block_rows;
-    local_state._shared_state->all_block_end.block_num =
-            local_state._shared_state->input_blocks.size();
-    local_state._shared_state->all_block_end.row_num = block_rows;
-    local_state._shared_state->all_block_end.pos = 
local_state._shared_state->input_total_rows;
+    local_state._input_total_rows += block_rows;
 
     // record origin columns, maybe be after this, could cast some column but 
no need to output
     auto column_to_keep = input_block->columns();
-
     {
         SCOPED_TIMER(local_state._compute_agg_data_timer);
-        for (size_t i = 0; i < _agg_functions_size;
-             ++i) { //insert _agg_input_columns, execute calculate for its
+        //insert _agg_input_columns, execute calculate for its, and those 
columns maybe could remove have used data
+        for (size_t i = 0; i < _agg_functions_size; ++i) {
             for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
-                RETURN_IF_ERROR(_insert_range_column(
-                        input_block, local_state._agg_expr_ctxs[i][j],
-                        
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
+                RETURN_IF_ERROR(_insert_range_column(input_block, 
local_state._agg_expr_ctxs[i][j],
+                                                     
local_state._agg_input_columns[i][j].get(),
+                                                     block_rows));
             }
         }
     }
     {
         SCOPED_TIMER(local_state._compute_partition_by_timer);
         for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size(); 
++i) {
-            int result_col_id = -1;
-            
RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block,
-                                                                               
&result_col_id));
-            DCHECK_GE(result_col_id, 0);
-            local_state._shared_state->partition_by_column_idxs[i] = 
result_col_id;
+            RETURN_IF_ERROR(
+                    _insert_range_column(input_block, 
local_state._partition_by_eq_expr_ctxs[i],
+                                         
local_state._partition_by_columns[i].get(), block_rows));
         }
     }
-
     {
         SCOPED_TIMER(local_state._compute_order_by_timer);
         for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) 
{
-            int result_col_id = -1;
+            RETURN_IF_ERROR(_insert_range_column(input_block, 
local_state._order_by_eq_expr_ctxs[i],
+                                                 
local_state._order_by_columns[i].get(),
+                                                 block_rows));
+        }
+    }
+    {
+        SCOPED_TIMER(local_state._compute_range_between_function_timer);
+        for (size_t i = 0; i < local_state._range_between_expr_ctxs.size(); 
++i) {
             RETURN_IF_ERROR(
-                    
local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
-            DCHECK_GE(result_col_id, 0);
-            local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+                    _insert_range_column(input_block, 
local_state._range_between_expr_ctxs[i],
+                                         
local_state._range_result_columns[i].get(), block_rows));
         }
     }
-
     vectorized::Block::erase_useless_column(input_block, column_to_keep);
-
     COUNTER_UPDATE(local_state._memory_used_counter, 
input_block->allocated_bytes());
+    COUNTER_UPDATE(local_state._blocks_memory_usage, 
input_block->allocated_bytes());
+    local_state._input_blocks.emplace_back(std::move(*input_block));
+    return Status::OK();
+}
+
+void AnalyticSinkLocalState::_remove_unused_rows() {
+    const size_t block_num = 256;
+    if (_removed_block_index + block_num + 1 >= 
_input_block_first_row_positions.size()) {
+        return;
+    }
+    const int64_t unused_rows_pos =
+            _input_block_first_row_positions[_removed_block_index + block_num];
+
+    if (_streaming_mode) {
+        auto idx = _output_block_index - 1;
+        if (idx < 0 || _input_block_first_row_positions[idx] <= 
unused_rows_pos) {
+            return;
+        }
+    } else {
+        if (_have_removed_rows + _partition_by_pose.start <= unused_rows_pos) {
+            return;
+        }
+    }
 
-    //TODO: if need improvement, the is a tips to maintain a free queue,
-    //so the memory could reuse, no need to new/delete again;
-    
local_state._shared_state->input_blocks.emplace_back(std::move(*input_block));
+    const int64_t remove_rows = unused_rows_pos - _have_removed_rows;
+    auto left_rows = _input_total_rows - _have_removed_rows - remove_rows;
     {
-        SCOPED_TIMER(local_state._evaluation_timer);
-        local_state._shared_state->found_partition_end = 
local_state._get_partition_by_end();
+        SCOPED_TIMER(_remove_rows_timer);
+        for (size_t i = 0; i < _agg_functions_size; i++) {
+            for (size_t j = 0; j < _agg_expr_ctxs[i].size(); j++) {
+                _agg_input_columns[i][j] =
+                        _agg_input_columns[i][j]->cut(remove_rows, 
left_rows)->assume_mutable();
+            }
+        }
+        for (size_t i = 0; i < _partition_exprs_size; i++) {
+            _partition_by_columns[i] =
+                    _partition_by_columns[i]->cut(remove_rows, 
left_rows)->assume_mutable();
+        }
+        for (size_t i = 0; i < _order_by_exprs_size; i++) {
+            _order_by_columns[i] =
+                    _order_by_columns[i]->cut(remove_rows, 
left_rows)->assume_mutable();
+        }
     }
-    local_state._refresh_need_more_input();
-    return Status::OK();
-}
+    COUNTER_UPDATE(_remove_count, 1);
+    COUNTER_UPDATE(_remove_rows, remove_rows);
+    _current_row_position -= remove_rows;
+    _partition_by_pose.remove_unused_rows(remove_rows);
+    _order_by_pose.remove_unused_rows(remove_rows);
+    int64_t candidate_partition_end_size = _next_partition_ends.size();
+    while (--candidate_partition_end_size >= 0) {
+        auto peek = _next_partition_ends.front();
+        _next_partition_ends.pop();
+        _next_partition_ends.push(peek - remove_rows);
+    }
+    int64_t candidate_peer_group_end_size = _next_order_by_ends.size();
+    while (--candidate_peer_group_end_size >= 0) {
+        auto peek = _next_order_by_ends.front();
+        _next_order_by_ends.pop();
+        _next_order_by_ends.push(peek - remove_rows);
+    }
+    _removed_block_index += block_num;
+    _have_removed_rows += remove_rows;
 
-size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool 
eos) {
-    auto& local_state = get_local_state(state);
-    return local_state._reserve_mem_size;
+    DCHECK_GE(_current_row_position, 0);
+    DCHECK_GE(_partition_by_pose.end, 0);
 }
 
 Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
@@ -354,6 +881,35 @@ Status 
AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block,
     return Status::OK();
 }
 
+void AnalyticSinkLocalState::_reset_agg_status() {
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i]->reset(_fn_place_ptr + 
_offsets_of_aggregate_states[i]);
+    }
+}
+
+void AnalyticSinkLocalState::_create_agg_status() {
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        try {
+            _agg_functions[i]->create(_fn_place_ptr + 
_offsets_of_aggregate_states[i]);
+        } catch (...) {
+            for (int j = 0; j < i; ++j) {
+                _agg_functions[j]->destroy(_fn_place_ptr + 
_offsets_of_aggregate_states[j]);
+            }
+            throw;
+        }
+    }
+    _agg_functions_created = true;
+}
+
+void AnalyticSinkLocalState::_destroy_agg_status() {
+    if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
+        return;
+    }
+    for (size_t i = 0; i < _agg_functions_size; ++i) {
+        _agg_functions[i]->destroy(_fn_place_ptr + 
_offsets_of_aggregate_states[i]);
+    }
+}
+
 template class DataSinkOperatorX<AnalyticSinkLocalState>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 67ef7df783d..3d910aaab7f 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -28,6 +28,40 @@ namespace doris {
 namespace pipeline {
 class AnalyticSinkOperatorX;
 
+struct BoundaryPose {
+    int64_t start = 0;
+    int64_t end = 0;
+    bool is_ended = false;
+    void remove_unused_rows(int64_t cnt) {
+        start -= cnt;
+        end -= cnt;
+    }
+};
+
+class PartitionStatistics {
+public:
+    void update(int64_t size) {
+        _count++;
+        _cumulative_size += size;
+        _average_size = _cumulative_size / _count;
+    }
+
+    void reset() {
+        _count = 0;
+        _cumulative_size = 0;
+        _average_size = 0;
+    }
+
+    bool is_high_cardinality() const { return _count > 16 && _average_size < 
8; }
+
+    int64_t _count = 0;
+    int64_t _cumulative_size = 0;
+    int64_t _average_size = 0;
+};
+
+// those function cacluate need partition info, so can't be used in streaming 
mode
+static const std::set<std::string> PARTITION_FUNCTION_SET {"ntile", 
"cume_dist", "percent_rank"};
+
 class AnalyticSinkLocalState : public 
PipelineXSinkLocalState<AnalyticSharedState> {
     ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState);
 
@@ -37,36 +71,106 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
 
 private:
     friend class AnalyticSinkOperatorX;
-
-    bool _refresh_need_more_input() {
-        auto need_more_input = 
_whether_need_next_partition(_shared_state->found_partition_end);
-        if (need_more_input) {
-            _dependency->set_block_to_read();
-            _dependency->set_ready();
-        } else {
-            _dependency->block();
-            _dependency->set_ready_to_read();
-        }
-        return need_more_input;
+    Status _execute_impl();
+    // over(partition by k1 order by k2 range|rows unbounded preceding and 
unbounded following)
+    bool _get_next_for_partition(int64_t batch_rows, int64_t 
current_block_base_pos);
+    // over(partition by k1 order by k2 range between unbounded preceding and 
current row)
+    bool _get_next_for_unbounded_range(int64_t batch_rows, int64_t 
current_block_base_pos);
+    // over(partition by k1 order by k2 range between M preceding and N 
following)
+    bool _get_next_for_range_between(int64_t batch_rows, int64_t 
current_block_base_pos);
+    // over(partition by k1 order by k2 rows between unbounded preceding and 
current row)
+    bool _get_next_for_unbounded_rows(int64_t batch_rows, int64_t 
current_block_base_pos);
+    // over(partition by k1 order by k2 rows between M preceding and N 
following)
+    bool _get_next_for_sliding_rows(int64_t batch_rows, int64_t 
current_block_base_pos);
+
+    void _init_result_columns();
+    void _execute_for_function(int64_t partition_start, int64_t partition_end, 
int64_t frame_start,
+                               int64_t frame_end);
+    void _insert_result_info(int64_t start, int64_t end);
+    int64_t current_pos_in_block() {
+        return _current_row_position + _have_removed_rows -
+               _input_block_first_row_positions[_output_block_index];
     }
-    BlockRowPos _get_partition_by_end();
-    BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, 
BlockRowPos end,
-                                         bool need_check_first = false);
-    bool _whether_need_next_partition(BlockRowPos& found_partition_end);
-
-    RuntimeProfile::Counter* _evaluation_timer = nullptr;
-    RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
-    RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
-    RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+    void _output_current_block(vectorized::Block* block);
+    void _reset_state_for_next_partition();
+    void _refresh_buffer_and_dependency_state(vectorized::Block* block);
+
+    void _create_agg_status();
+    void _reset_agg_status();
+    void _destroy_agg_status();
+    void _remove_unused_rows();
+
+    void _get_partition_by_end();
+    void _find_next_partition_ends();
+    void _update_order_by_range();
+    void _find_next_order_by_ends();
+    int64_t find_first_not_equal(vectorized::IColumn* reference_column,
+                                 vectorized::IColumn* compared_column, int64_t 
target,
+                                 int64_t start, int64_t end);
 
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
     vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
     vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+    vectorized::VExprContextSPtrs _range_between_expr_ctxs;
+    std::vector<std::vector<vectorized::MutableColumnPtr>> _agg_input_columns;
+    std::vector<vectorized::MutableColumnPtr> _partition_by_columns;
+    std::vector<vectorized::MutableColumnPtr> _order_by_columns;
+    std::vector<vectorized::MutableColumnPtr> _range_result_columns;
+    size_t _partition_exprs_size = 0;
+    size_t _order_by_exprs_size = 0;
+    BoundaryPose _partition_by_pose;
+    BoundaryPose _order_by_pose;
+    PartitionStatistics _partition_column_statistics;
+    PartitionStatistics _order_by_column_statistics;
+    std::queue<int64_t> _next_partition_ends;
+    std::queue<int64_t> _next_order_by_ends;
 
+    size_t _agg_functions_size = 0;
+    bool _agg_functions_created = false;
+    vectorized::AggregateDataPtr _fn_place_ptr = nullptr;
+    std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
+    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
+    std::vector<size_t> _offsets_of_aggregate_states;
+    std::vector<bool> _result_column_nullable_flags;
+    std::vector<bool> _result_column_could_resize;
+
+    using vectorized_get_next = bool (AnalyticSinkLocalState::*)(int64_t, 
int64_t);
+    struct executor {
+        vectorized_get_next get_next_impl;
+    };
+    executor _executor;
+
+    bool _current_window_empty = false;
+    bool _streaming_mode = false;
+    int64_t _current_row_position = 0;
+    int64_t _output_block_index = 0;
+    std::vector<vectorized::MutableColumnPtr> _result_window_columns;
+
+    int64_t _rows_start_offset = 0;
+    int64_t _rows_end_offset = 0;
+    int64_t _input_total_rows = 0;
+    bool _input_eos = false;
+    std::vector<vectorized::Block> _input_blocks;
+    std::vector<int64_t> _input_block_first_row_positions;
+    int64_t _removed_block_index = 0;
+    int64_t _have_removed_rows = 0;
     int64_t _reserve_mem_size = 0;
+
+    RuntimeProfile::Counter* _evaluation_timer = nullptr;
+    RuntimeProfile::Counter* _compute_agg_data_timer = nullptr;
+    RuntimeProfile::Counter* _compute_partition_by_timer = nullptr;
+    RuntimeProfile::Counter* _compute_order_by_timer = nullptr;
+    RuntimeProfile::Counter* _compute_range_between_function_timer = nullptr;
+    RuntimeProfile::Counter* _partition_search_timer = nullptr;
+    RuntimeProfile::Counter* _order_search_timer = nullptr;
+    RuntimeProfile::Counter* _remove_rows_timer = nullptr;
+    RuntimeProfile::Counter* _remove_count = nullptr;
+    RuntimeProfile::Counter* _remove_rows = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
 };
 
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
@@ -98,23 +202,44 @@ public:
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
 private:
+    friend class AnalyticSinkLocalState;
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
                                 vectorized::IColumn* dst_column, size_t 
length);
+    Status _add_input_block(doris::RuntimeState* state, vectorized::Block* 
input_block);
 
-    friend class AnalyticSinkLocalState;
-
+    ObjectPool* _pool = nullptr;
     std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
     vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs;
     vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs;
+    vectorized::VExprContextSPtrs _range_between_expr_ctxs;
 
     size_t _agg_functions_size = 0;
+    std::vector<size_t> _num_agg_input;
+    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
 
+    TupleId _intermediate_tuple_id;
+    TupleId _output_tuple_id;
+    TupleDescriptor* _intermediate_tuple_desc = nullptr;
+    TupleDescriptor* _output_tuple_desc = nullptr;
     const TTupleId _buffered_tuple_id;
 
-    std::vector<size_t> _num_agg_input;
     const bool _is_colocate;
     const bool _require_bucket_distribution;
     const std::vector<TExpr> _partition_exprs;
+
+    TAnalyticWindow _window;
+    bool _has_window;
+    bool _has_range_window;
+    bool _has_window_start;
+    bool _has_window_end;
+
+    /// The offset of the n-th functions.
+    std::vector<size_t> _offsets_of_aggregate_states;
+    /// The total size of the row from the functions.
+    size_t _total_size_of_aggregate_states = 0;
+    /// The max align size for functions
+    size_t _align_aggregate_states = 1;
+    std::vector<bool> _change_to_nullable_flags;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 4c455b94c9c..ce6f0d1d107 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -17,6 +17,7 @@
 
 #include "analytic_source_operator.h"
 
+#include <cstddef>
 #include <string>
 
 #include "pipeline/exec/operator.h"
@@ -27,587 +28,68 @@ namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 
 AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* 
parent)
-        : PipelineXLocalState<AnalyticSharedState>(state, parent),
-          _output_block_index(0),
-          _window_end_position(0),
-          _next_partition(false),
-          _rows_start_offset(0),
-          _rows_end_offset(0),
-          _fn_place_ptr(nullptr),
-          _agg_functions_size(0),
-          _agg_functions_created(false),
-          _agg_arena_pool(std::make_unique<vectorized::Arena>()) {}
-
-//_partition_by_columns,_order_by_columns save in blocks, so if need to 
calculate the boundary, may find in which blocks firstly
-BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int64_t idx, 
BlockRowPos start,
-                                                         BlockRowPos end, bool 
need_check_first) {
-    auto& shared_state = *_shared_state;
-    int64_t start_init_row_num = start.row_num;
-    vectorized::ColumnPtr start_column =
-            
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
-    vectorized::ColumnPtr start_next_block_column = start_column;
-
-    DCHECK_LE(start.block_num, end.block_num);
-    DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1);
-    int64_t start_block_num = start.block_num;
-    int64_t end_block_num = end.block_num;
-    int64_t mid_blcok_num = end.block_num;
-    // To fix this problem: https://github.com/apache/doris/issues/15951
-    // in this case, the partition by column is last row of block, so it's 
pointed to a new block at row = 0, range is: [left, right)
-    // From the perspective of order by column, the two values are exactly 
equal.
-    // so the range will be get wrong because it's compare_at == 0 with next 
block at row = 0
-    if (need_check_first && end.block_num > 0 && end.row_num == 0) {
-        end.block_num--;
-        end_block_num--;
-        end.row_num = shared_state.input_blocks[end_block_num].rows();
-    }
-    //binary search find in which block
-    while (start_block_num < end_block_num) {
-        mid_blcok_num = (start_block_num + end_block_num + 1) >> 1;
-        start_next_block_column =
-                
shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column;
-        //Compares (*this)[n] and rhs[m], this: start[init_row]  rhs: mid[0]
-        if (start_column->compare_at(start_init_row_num, 0, 
*start_next_block_column, 1) == 0) {
-            start_block_num = mid_blcok_num;
-        } else {
-            end_block_num = mid_blcok_num - 1;
-        }
-    }
-
-    // have check the start.block_num:  start_column[start_init_row_num] with 
mid_blcok_num start_next_block_column[0]
-    // now next block must not be result, so need check with end_block_num: 
start_next_block_column[last_row]
-    if (end_block_num == mid_blcok_num - 1) {
-        start_next_block_column =
-                
shared_state.input_blocks[end_block_num].get_by_position(idx).column;
-        int64_t block_size = shared_state.input_blocks[end_block_num].rows();
-        if ((start_column->compare_at(start_init_row_num, block_size - 1, 
*start_next_block_column,
-                                      1) == 0)) {
-            start.block_num = end_block_num + 1;
-            start.row_num = 0;
-            return start;
-        }
-    }
-
-    //check whether need get column again, maybe same as first init
-    // if the start_block_num have move to forword, so need update start block 
num and compare it from row_num=0
-    if (start_block_num != start.block_num) {
-        start_init_row_num = 0;
-        start.block_num = start_block_num;
-        start_column = 
shared_state.input_blocks[start.block_num].get_by_position(idx).column;
-    }
-    //binary search, set start and end pos
-    int64_t start_pos = start_init_row_num;
-    int64_t end_pos = shared_state.input_blocks[start.block_num].rows();
-    //if end_block_num haven't moved, only start_block_num go to the end block
-    //so could use the end.row_num for binary search
-    if (start.block_num == end.block_num) {
-        end_pos = end.row_num;
-    }
-    while (start_pos < end_pos) {
-        int64_t mid_pos = (start_pos + end_pos) >> 1;
-        if (start_column->compare_at(start_init_row_num, mid_pos, 
*start_column, 1)) {
-            end_pos = mid_pos;
-        } else {
-            start_pos = mid_pos + 1;
-        }
-    }
-    start.row_num = start_pos; //update row num, return the find end
-    return start;
-}
-
-BlockRowPos AnalyticLocalState::_get_partition_by_end() {
-    auto& shared_state = *_shared_state;
-    if (shared_state.current_row_position <
-        shared_state.partition_by_end.pos) { //still have data, return 
partition_by_end directly
-        return shared_state.partition_by_end;
-    }
-
-    const auto partition_exprs_size =
-            _parent->cast<AnalyticSourceOperatorX>()._partition_exprs_size;
-    if (partition_exprs_size == 0 ||
-        (shared_state.input_total_rows == 0)) { //no partition_by, the all 
block is end
-        return shared_state.all_block_end;
-    }
-
-    BlockRowPos cal_end = shared_state.all_block_end;
-    for (size_t i = 0; i < partition_exprs_size;
-         ++i) { //have partition_by, binary search the partiton end
-        cal_end = 
_compare_row_to_find_end(shared_state.partition_by_column_idxs[i],
-                                           shared_state.partition_by_end, 
cal_end);
-    }
-    cal_end.pos = 
shared_state.input_block_first_row_positions[cal_end.block_num] + 
cal_end.row_num;
-    return cal_end;
-}
-
-bool AnalyticLocalState::_whether_need_next_partition(BlockRowPos& 
found_partition_end) {
-    auto& shared_state = *_shared_state;
-    if (shared_state.input_eos ||
-        (shared_state.current_row_position <
-         shared_state.partition_by_end.pos)) { //now still have partition data
-        return false;
-    }
-    const auto partition_exprs_size =
-            _parent->cast<AnalyticSourceOperatorX>()._partition_exprs_size;
-    if ((partition_exprs_size == 0 && !shared_state.input_eos) ||
-        (found_partition_end.pos == 0)) { //no partition, get until fetch to 
EOS
-        return true;
-    }
-    if (partition_exprs_size != 0 && found_partition_end.pos == 
shared_state.all_block_end.pos &&
-        !shared_state.input_eos) { //current partition data calculate done
-        return true;
-    }
-    return false;
-}
+        : PipelineXLocalState<AnalyticSharedState>(state, parent) {}
 
 Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state, 
info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
-    _blocks_memory_usage =
-            profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", 
TUnit::BYTES, "", 1);
-    _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime");
-    _execute_timer = ADD_TIMER(profile(), "ExecuteTime");
     _get_next_timer = ADD_TIMER(profile(), "GetNextTime");
-    _get_result_timer = ADD_TIMER(profile(), "GetResultsTime");
-    return Status::OK();
-}
-
-Status AnalyticLocalState::open(RuntimeState* state) {
-    RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
-
-    auto& p = _parent->cast<AnalyticSourceOperatorX>();
-    _agg_functions_size = p._agg_functions.size();
-    _offsets_of_aggregate_states.resize(_agg_functions_size);
-    _result_column_nullable_flags.resize(_agg_functions_size);
-
-    _agg_functions.resize(p._agg_functions.size());
-    for (size_t i = 0; i < _agg_functions.size(); i++) {
-        _agg_functions[i] = p._agg_functions[i]->clone(state, 
state->obj_pool());
-        _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i];
-        _result_column_nullable_flags[i] =
-                
!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
-                _agg_functions[i]->data_type()->is_nullable();
-    }
-
-    _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states,
-                                                   p._align_aggregate_states);
-
-    if (!p._has_window) { //haven't set window, Unbounded:  [unbounded 
preceding,unbounded following]
-        _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition, this,
-                                               std::placeholders::_1);
-
-    } else if (p._has_range_window) {
-        if (!p._has_window_end) { //haven't set end, so same as PARTITION, 
[unbounded preceding, unbounded following]
-            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
-                                                   this, 
std::placeholders::_1);
-
-        } else {
-            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_range, this,
-                                                   std::placeholders::_1);
-        }
-
-    } else {
-        if (!p._has_window_start &&
-            !p._has_window_end) { //haven't set start and end, same as 
PARTITION
-            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_partition,
-                                                   this, 
std::placeholders::_1);
-
-        } else {
-            if (p._has_window_start) { //calculate start boundary
-                TAnalyticWindowBoundary b = p._window.window_start;
-                if (b.__isset.rows_offset_value) { //[offset     ,   ]
-                    _rows_start_offset = b.rows_offset_value;
-                    if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
-                        _rows_start_offset *= -1; //preceding--> negative
-                    }                             //current_row  0
-                } else {                          //following    positive
-                    DCHECK_EQ(b.type, 
TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row,   ]
-                    _rows_start_offset = 0;
-                }
-            }
-
-            if (p._has_window_end) { //calculate end boundary
-                TAnalyticWindowBoundary b = p._window.window_end;
-                if (b.__isset.rows_offset_value) { //[       , offset]
-                    _rows_end_offset = b.rows_offset_value;
-                    if (b.type == TAnalyticWindowBoundaryType::PRECEDING) {
-                        _rows_end_offset *= -1;
-                    }
-                } else {
-                    DCHECK_EQ(b.type, 
TAnalyticWindowBoundaryType::CURRENT_ROW); //[   ,current row]
-                    _rows_end_offset = 0;
-                }
-            }
-
-            _executor.get_next = 
std::bind<Status>(&AnalyticLocalState::_get_next_for_rows, this,
-                                                   std::placeholders::_1);
-        }
-    }
-    _create_agg_status();
-    return Status::OK();
-}
-
-void AnalyticLocalState::_reset_agg_status() {
-    for (size_t i = 0; i < _agg_functions_size; ++i) {
-        _agg_functions[i]->reset(
-                _fn_place_ptr +
-                
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
-    }
-}
-
-void AnalyticLocalState::_create_agg_status() {
-    for (size_t i = 0; i < _agg_functions_size; ++i) {
-        try {
-            _agg_functions[i]->create(
-                    _fn_place_ptr +
-                    
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
-        } catch (...) {
-            for (int j = 0; j < i; ++j) {
-                _agg_functions[j]->destroy(
-                        _fn_place_ptr +
-                        
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[j]);
-            }
-            throw;
-        }
-    }
-    _agg_functions_created = true;
-}
-
-void AnalyticLocalState::_destroy_agg_status() {
-    if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
-        return;
-    }
-    for (size_t i = 0; i < _agg_functions_size; ++i) {
-        _agg_functions[i]->destroy(
-                _fn_place_ptr +
-                
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
-    }
-}
-
-void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, 
int64_t partition_end,
-                                               int64_t frame_start, int64_t 
frame_end) {
-    for (size_t i = 0; i < _agg_functions_size; ++i) {
-        std::vector<const vectorized::IColumn*> agg_columns;
-        for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) {
-            
agg_columns.push_back(_shared_state->agg_input_columns[i][j].get());
-        }
-        _agg_functions[i]->function()->add_range_single_place(
-                partition_start, partition_end, frame_start, frame_end,
-                _fn_place_ptr + _offsets_of_aggregate_states[i], 
agg_columns.data(),
-                _agg_arena_pool.get());
-
-        // If the end is not greater than the start, the current window should 
be empty.
-        _current_window_empty =
-                std::min(frame_end, partition_end) <= std::max(frame_start, 
partition_start);
-    }
-}
-
-void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {
-    int64_t current_block_row_pos =
-            
_shared_state->input_block_first_row_positions[_output_block_index];
-    int64_t get_result_start = _shared_state->current_row_position - 
current_block_row_pos;
-    if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope == 
AnalyticFnScope::PARTITION) {
-        int64_t get_result_end =
-                std::min<int64_t>(_shared_state->current_row_position + 
current_block_rows,
-                                  _shared_state->partition_by_end.pos);
-        _window_end_position =
-                std::min<int64_t>(get_result_end - current_block_row_pos, 
current_block_rows);
-        _shared_state->current_row_position += (_window_end_position - 
get_result_start);
-    } else if (_parent->cast<AnalyticSourceOperatorX>()._fn_scope == 
AnalyticFnScope::RANGE) {
-        _window_end_position =
-                std::min<int64_t>(_order_by_end.pos - current_block_row_pos, 
current_block_rows);
-        _shared_state->current_row_position += (_window_end_position - 
get_result_start);
-    } else {
-        _window_end_position++;
-        _shared_state->current_row_position++;
-    }
-
-    for (size_t i = 0; i < _agg_functions_size; ++i) {
-        for (size_t j = get_result_start; j < _window_end_position; ++j) {
-            if (_result_column_nullable_flags[i]) {
-                if (_current_window_empty) {
-                    _result_window_columns[i]->insert_default();
-                } else {
-                    auto* dst = assert_cast<vectorized::ColumnNullable*>(
-                            _result_window_columns[i].get());
-                    dst->get_null_map_data().push_back(0);
-                    _agg_functions[i]->insert_result_info(
-                            _fn_place_ptr + _offsets_of_aggregate_states[i],
-                            &dst->get_nested_column());
-                }
-            } else {
-                _agg_functions[i]->insert_result_info(
-                        _fn_place_ptr + _offsets_of_aggregate_states[i],
-                        _result_window_columns[i].get());
-            }
-        }
-    }
-}
-
-Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
-    SCOPED_TIMER(_get_next_timer);
-    while (_shared_state->current_row_position < 
_shared_state->partition_by_end.pos &&
-           _window_end_position < current_block_rows) {
-        int64_t range_start, range_end;
-        if 
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
-            _parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
-                    TAnalyticWindowBoundaryType::CURRENT_ROW) {
-            // [preceding, current_row], [current_row, following] rewrite it's 
same
-            // as could reuse the previous calculate result, so don't call 
_reset_agg_status function
-            // going on calculate, add up data, no need to reset state
-            range_start = _shared_state->current_row_position;
-            range_end = _shared_state->current_row_position + 1;
-        } else {
-            _reset_agg_status();
-            range_end = _shared_state->current_row_position + _rows_end_offset 
+ 1;
-            //[preceding, offset]        --unbound: [preceding, following]
-            if 
(!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
-                range_start = _partition_by_start.pos;
-            } else {
-                range_start = _shared_state->current_row_position + 
_rows_start_offset;
-            }
-            // Make sure range_start <= range_end
-            range_start = std::min(range_start, range_end);
-        }
-        _execute_for_win_func(_partition_by_start.pos, 
_shared_state->partition_by_end.pos,
-                              range_start, range_end);
-        _insert_result_info(current_block_rows);
-    }
-    return Status::OK();
-}
-
-Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) {
-    SCOPED_TIMER(_get_next_timer);
-    if (_next_partition) {
-        _execute_for_win_func(_partition_by_start.pos, 
_shared_state->partition_by_end.pos,
-                              _partition_by_start.pos, 
_shared_state->partition_by_end.pos);
-    }
-    _insert_result_info(current_block_rows);
-    return Status::OK();
-}
-
-Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) {
-    SCOPED_TIMER(_get_next_timer);
-    while (_shared_state->current_row_position < 
_shared_state->partition_by_end.pos &&
-           _window_end_position < current_block_rows) {
-        if (_shared_state->current_row_position >= _order_by_end.pos) {
-            _update_order_by_range();
-            _execute_for_win_func(_partition_by_start.pos, 
_shared_state->partition_by_end.pos,
-                                  _order_by_start.pos, _order_by_end.pos);
-        }
-        _insert_result_info(current_block_rows);
-    }
-    return Status::OK();
-}
-
-void AnalyticLocalState::_update_order_by_range() {
-    _order_by_start = _order_by_end;
-    _order_by_end = _shared_state->partition_by_end;
-    for (size_t i = 0; i < 
_parent->cast<AnalyticSourceOperatorX>()._order_by_exprs_size; ++i) {
-        _order_by_end = 
_compare_row_to_find_end(_shared_state->ordey_by_column_idxs[i],
-                                                 _order_by_start, 
_order_by_end, true);
-    }
-    _order_by_start.pos =
-            
_shared_state->input_block_first_row_positions[_order_by_start.block_num] +
-            _order_by_start.row_num;
-    _order_by_end.pos = 
_shared_state->input_block_first_row_positions[_order_by_end.block_num] +
-                        _order_by_end.row_num;
-    // `_order_by_end` will be assigned to `_order_by_start` next time,
-    // so make it a valid position.
-    if (_order_by_end.row_num == 
_shared_state->input_blocks[_order_by_end.block_num].rows()) {
-        _order_by_end.block_num++;
-        _order_by_end.row_num = 0;
-    }
-}
-
-void AnalyticLocalState::init_result_columns() {
-    if (!_window_end_position) {
-        _result_window_columns.resize(_agg_functions_size);
-        for (size_t i = 0; i < _agg_functions_size; ++i) {
-            _result_window_columns[i] =
-                    _agg_functions[i]->data_type()->create_column(); //return 
type
-        }
-    }
-}
-
-//calculate pos have arrive partition end, so it's needed to init next 
partition, and update the boundary of partition
-bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) {
-    if ((_shared_state->current_row_position >= 
_shared_state->partition_by_end.pos) &&
-        ((_shared_state->partition_by_end.pos == 0) ||
-         (_shared_state->partition_by_end.pos != found_partition_end.pos))) {
-        _partition_by_start = _shared_state->partition_by_end;
-        _shared_state->partition_by_end = found_partition_end;
-        _shared_state->current_row_position = _partition_by_start.pos;
-        _reset_agg_status();
-        return true;
-    }
-    return false;
-}
-
-Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
-    block->swap(std::move(_shared_state->input_blocks[_output_block_index]));
-    _blocks_memory_usage->add(block->allocated_bytes());
-
-    
DCHECK(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags.size()
 ==
-           _result_window_columns.size());
-    for (size_t i = 0; i < _result_window_columns.size(); ++i) {
-        if 
(_parent->cast<AnalyticSourceOperatorX>()._change_to_nullable_flags[i]) {
-            block->insert({make_nullable(std::move(_result_window_columns[i])),
-                           make_nullable(_agg_functions[i]->data_type()), ""});
-        } else {
-            block->insert(
-                    {std::move(_result_window_columns[i]), 
_agg_functions[i]->data_type(), ""});
-        }
-    }
-
-    _output_block_index++;
-    _window_end_position = 0;
-
+    _filtered_rows_counter = ADD_COUNTER(profile(), "FilteredRows", 
TUnit::UNIT);
     return Status::OK();
 }
 
 AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
                                                  int operator_id, const 
DescriptorTbl& descs)
-        : OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs),
-          _window(tnode.analytic_node.window),
-          _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
-          _output_tuple_id(tnode.analytic_node.output_tuple_id),
-          _has_window(tnode.analytic_node.__isset.window),
-          _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
-          _has_window_start(tnode.analytic_node.window.__isset.window_start),
-          _has_window_end(tnode.analytic_node.window.__isset.window_end),
-          _partition_exprs_size(tnode.analytic_node.partition_exprs.size()),
-          _order_by_exprs_size(tnode.analytic_node.order_by_exprs.size()) {
+        : OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs) {
     _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
-    _fn_scope = AnalyticFnScope::PARTITION;
-    if (tnode.analytic_node.__isset.window &&
-        tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
-        DCHECK(!_window.__isset.window_start) << "RANGE windows must have 
UNBOUNDED PRECEDING";
-        DCHECK(!_window.__isset.window_end ||
-               _window.window_end.type == 
TAnalyticWindowBoundaryType::CURRENT_ROW)
-                << "RANGE window end bound must be CURRENT ROW or UNBOUNDED 
FOLLOWING";
-
-        if (_window.__isset
-                    .window_end) { //haven't set end, so same as PARTITION, 
[unbounded preceding, unbounded following]
-            _fn_scope = AnalyticFnScope::RANGE; //range:  [unbounded 
preceding,current row]
-        }
-
-    } else if (tnode.analytic_node.__isset.window) {
-        if (_window.__isset.window_start || _window.__isset.window_end) {
-            _fn_scope = AnalyticFnScope::ROWS;
-        }
-    }
 }
 
-Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
-    RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::init(tnode, state));
-    const TAnalyticNode& analytic_node = tnode.analytic_node;
-    size_t agg_size = analytic_node.analytic_functions.size();
-    for (int i = 0; i < agg_size; ++i) {
-        vectorized::AggFnEvaluator* evaluator = nullptr;
-        // Window function treats all NullableAggregateFunction as 
AlwaysNullable.
-        // Its behavior is same with executed without group by key.
-        // https://github.com/apache/doris/pull/40693
-        RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(
-                _pool, analytic_node.analytic_functions[i], {}, /*wihout_key*/ 
true, &evaluator));
-        _agg_functions.emplace_back(evaluator);
-    }
-
-    return Status::OK();
-}
-
-Status AnalyticSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
+Status AnalyticSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* output_block,
                                           bool* eos) {
+    RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
-    if (local_state._shared_state->input_eos &&
-        (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
-         local_state._shared_state->input_total_rows == 0)) {
-        *eos = true;
-        return Status::OK();
-    }
-
-    while (!local_state._shared_state->input_eos ||
-           local_state._output_block_index < 
local_state._shared_state->input_blocks.size()) {
-        {
-            SCOPED_TIMER(local_state._evaluation_timer);
-            local_state._shared_state->found_partition_end = 
local_state._get_partition_by_end();
-        }
-        if (local_state._refresh_need_more_input()) {
-            return Status::OK();
-        }
-        local_state._next_partition =
-                
local_state.init_next_partition(local_state._shared_state->found_partition_end);
-        local_state.init_result_columns();
-        size_t current_block_rows =
-                
local_state._shared_state->input_blocks[local_state._output_block_index].rows();
-        RETURN_IF_ERROR(local_state._executor.get_next(current_block_rows));
-        if (local_state._window_end_position == current_block_rows) {
-            break;
+    SCOPED_TIMER(local_state._get_next_timer);
+    output_block->clear_column_data();
+    size_t output_rows = 0;
+    {
+        std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
+        if (!local_state._shared_state->blocks_buffer.empty()) {
+            
local_state._shared_state->blocks_buffer.front().swap(*output_block);
+            local_state._shared_state->blocks_buffer.pop();
+            output_rows = output_block->rows();
+            //if buffer have no data and sink not eos, block reading and wait 
for signal again
+            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+                    local_state._conjuncts, output_block, 
output_block->columns()));
+            if (local_state._shared_state->blocks_buffer.empty() &&
+                !local_state._shared_state->sink_eos) {
+                // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
+                // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
+                std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+                if (!local_state._shared_state->sink_eos) {
+                    local_state._dependency->block();              // block 
self source
+                    local_state._dependency->set_ready_to_write(); // ready 
for sink write
+                }
+            }
+        } else {
+            //iff buffer have no data and sink eos, set eos
+            std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+            *eos = local_state._shared_state->sink_eos;
         }
     }
-    RETURN_IF_ERROR(local_state.output_current_block(block));
-    RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, 
block->columns()));
-    local_state.reached_limit(block, eos);
-    return Status::OK();
-}
-
-Status AnalyticLocalState::close(RuntimeState* state) {
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_close_timer);
-    if (_closed) {
-        return Status::OK();
+    local_state.reached_limit(output_block, eos);
+    if (!output_block->empty()) {
+        auto return_rows = output_block->rows();
+        local_state._num_rows_returned += return_rows;
+        COUNTER_UPDATE(local_state._filtered_rows_counter, output_rows - 
return_rows);
     }
-
-    _destroy_agg_status();
-    _agg_arena_pool = nullptr;
-
-    std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
-    _result_window_columns.swap(tmp_result_window_columns);
-    return PipelineXLocalState<AnalyticSharedState>::close(state);
+    return Status::OK();
 }
 
 Status AnalyticSourceOperatorX::open(RuntimeState* state) {
     RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
     DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
-    _intermediate_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
-    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
-    for (size_t i = 0; i < _agg_functions.size(); ++i) {
-        SlotDescriptor* intermediate_slot_desc = 
_intermediate_tuple_desc->slots()[i];
-        SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
-        RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
-                                                   intermediate_slot_desc, 
output_slot_desc));
-        _agg_functions[i]->set_version(state->be_exec_version());
-        _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
-                                            
!_agg_functions[i]->data_type()->is_nullable());
-    }
-
-    _offsets_of_aggregate_states.resize(_agg_functions.size());
-    for (size_t i = 0; i < _agg_functions.size(); ++i) {
-        _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states;
-        const auto& agg_function = _agg_functions[i]->function();
-        // aggregate states are aligned based on maximum requirement
-        _align_aggregate_states = std::max(_align_aggregate_states, 
agg_function->align_of_data());
-        _total_size_of_aggregate_states += agg_function->size_of_data();
-        // If not the last aggregate_state, we need pad it so that next 
aggregate_state will be aligned.
-        if (i + 1 < _agg_functions.size()) {
-            size_t alignment_of_next_state = _agg_functions[i + 
1]->function()->align_of_data();
-            if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 
0) {
-                return Status::RuntimeError("Logical error: align_of_data is 
not 2^N");
-            }
-            /// Extend total_size to next alignment requirement
-            /// Add padding by rounding up 'total_size_of_aggregate_states' to 
be a multiplier of alignment_of_next_state.
-            _total_size_of_aggregate_states =
-                    (_total_size_of_aggregate_states + alignment_of_next_state 
- 1) /
-                    alignment_of_next_state * alignment_of_next_state;
-        }
-    }
-    for (auto* agg_function : _agg_functions) {
-        RETURN_IF_ERROR(agg_function->open(state));
-    }
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/analytic_source_operator.h 
b/be/src/pipeline/exec/analytic_source_operator.h
index 639a27ffb7c..be1fdb2c9e5 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -27,89 +27,18 @@ class RuntimeState;
 
 namespace pipeline {
 #include "common/compile_check_begin.h"
-enum AnalyticFnScope { PARTITION, RANGE, ROWS };
 
 class AnalyticSourceOperatorX;
 class AnalyticLocalState final : public 
PipelineXLocalState<AnalyticSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(AnalyticLocalState);
     AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);
-
     Status init(RuntimeState* state, LocalStateInfo& info) override;
-    Status open(RuntimeState* state) override;
-    Status close(RuntimeState* state) override;
-
-    void init_result_columns();
-
-    Status output_current_block(vectorized::Block* block);
-
-    bool init_next_partition(BlockRowPos found_partition_end);
 
 private:
-    Status _get_next_for_rows(size_t rows);
-    Status _get_next_for_range(size_t rows);
-    Status _get_next_for_partition(size_t rows);
-
-    void _execute_for_win_func(int64_t partition_start, int64_t partition_end, 
int64_t frame_start,
-                               int64_t frame_end);
-    void _insert_result_info(int64_t current_block_rows);
-
-    void _update_order_by_range();
-    bool _refresh_need_more_input() {
-        auto need_more_input = 
_whether_need_next_partition(_shared_state->found_partition_end);
-        if (need_more_input) {
-            _dependency->block();
-            _dependency->set_ready_to_write();
-        } else {
-            _dependency->set_block_to_write();
-            _dependency->set_ready();
-        }
-        return need_more_input;
-    }
-    BlockRowPos _get_partition_by_end();
-    BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, 
BlockRowPos end,
-                                         bool need_check_first = false);
-    bool _whether_need_next_partition(BlockRowPos& found_partition_end);
-
-    void _reset_agg_status();
-    void _create_agg_status();
-    void _destroy_agg_status();
-
     friend class AnalyticSourceOperatorX;
-
-    int64_t _output_block_index;
-    int64_t _window_end_position;
-    bool _next_partition;
-    std::vector<vectorized::MutableColumnPtr> _result_window_columns;
-
-    int64_t _rows_start_offset;
-    int64_t _rows_end_offset;
-    vectorized::AggregateDataPtr _fn_place_ptr;
-    size_t _agg_functions_size;
-    bool _agg_functions_created;
-    bool _current_window_empty = false;
-
-    BlockRowPos _order_by_start;
-    BlockRowPos _order_by_end;
-    BlockRowPos _partition_by_start;
-    std::unique_ptr<vectorized::Arena> _agg_arena_pool;
-    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
-    std::vector<size_t> _offsets_of_aggregate_states;
-    std::vector<bool> _result_column_nullable_flags;
-
-    RuntimeProfile::Counter* _evaluation_timer = nullptr;
-    RuntimeProfile::Counter* _execute_timer = nullptr;
     RuntimeProfile::Counter* _get_next_timer = nullptr;
-    RuntimeProfile::Counter* _get_result_timer = nullptr;
-    RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr;
-
-    using vectorized_get_next = std::function<Status(size_t rows)>;
-
-    struct executor {
-        vectorized_get_next get_next;
-    };
-
-    executor _executor;
+    RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
 };
 
 class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
@@ -121,39 +50,10 @@ public:
 
     bool is_source() const override { return true; }
 
-    Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status open(RuntimeState* state) override;
 
 private:
     friend class AnalyticLocalState;
-
-    TAnalyticWindow _window;
-
-    TupleId _intermediate_tuple_id;
-    TupleId _output_tuple_id;
-
-    bool _has_window;
-    bool _has_range_window;
-    bool _has_window_start;
-    bool _has_window_end;
-
-    std::vector<vectorized::AggFnEvaluator*> _agg_functions;
-
-    AnalyticFnScope _fn_scope;
-
-    TupleDescriptor* _intermediate_tuple_desc = nullptr;
-    TupleDescriptor* _output_tuple_desc = nullptr;
-
-    /// The offset of the n-th functions.
-    std::vector<size_t> _offsets_of_aggregate_states;
-    /// The total size of the row from the functions.
-    size_t _total_size_of_aggregate_states = 0;
-    /// The max align size for functions
-    size_t _align_aggregate_states = 1;
-
-    std::vector<bool> _change_to_nullable_flags;
-    const size_t _partition_exprs_size;
-    const size_t _order_by_exprs_size;
 };
 
 } // namespace pipeline
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index d761d40c4c9..9a25056d213 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -233,6 +233,19 @@ public:
     virtual Status verify_result_type(const bool without_key, const DataTypes& 
argument_types,
                                       const DataTypePtr result_type) const = 0;
 
+    // agg function is used result column push_back to insert result,
+    // and now want's resize column early and use operator[] to insert result.
+    // but like result column is string column, it's can't resize dirctly with 
operator[]
+    // need template specialization agg for the string type in 
insert_result_into_range
+    virtual bool result_column_could_resize() const { return false; }
+
+    virtual void insert_result_into_range(ConstAggregateDataPtr __restrict 
place, IColumn& to,
+                                          const size_t start, const size_t 
end) const {
+        for (size_t i = start; i < end; ++i) {
+            insert_result_into(place, to);
+        }
+    }
+
 protected:
     DataTypes argument_types;
     int version {};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h 
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 5d449318b7d..4b2dfb4d65b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -76,7 +76,18 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
-        assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
+        assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                doris::vectorized::WindowFunctionRowNumber::data(place).count);
+    }
+
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to);
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = 
(doris::vectorized::WindowFunctionRowNumber::data(place).count);
+        }
     }
 
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
@@ -86,8 +97,8 @@ public:
 
 struct RankData {
     int64_t rank = 0;
-    int64_t count = 0;
-    int64_t peer_group_start = 0;
+    int64_t count = 1;
+    int64_t peer_group_start = -1;
 };
 
 class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData, 
WindowFunctionRank> {
@@ -121,7 +132,18 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
-        assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).rank);
+        assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                data(place).rank);
+    }
+
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to);
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = 
(doris::vectorized::WindowFunctionRank::data(place).rank);
+        }
     }
 
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
@@ -131,7 +153,7 @@ public:
 
 struct DenseRankData {
     int64_t rank = 0;
-    int64_t peer_group_start = 0;
+    int64_t peer_group_start = -1;
 };
 
 class WindowFunctionDenseRank final
@@ -163,7 +185,18 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
-        assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).rank);
+        assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                data(place).rank);
+    }
+
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to);
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = 
(doris::vectorized::WindowFunctionDenseRank::data(place).rank);
+        }
     }
 
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
@@ -173,8 +206,8 @@ public:
 
 struct PercentRankData {
     int64_t rank = 0;
-    int64_t count = 0;
-    int64_t peer_group_start = 0;
+    int64_t count = 1;
+    int64_t peer_group_start = -1;
     int64_t partition_size = 0;
 };
 
@@ -219,7 +252,19 @@ public:
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
         auto percent_rank = _cal_percent(data(place).rank, 
data(place).partition_size);
-        assert_cast<ColumnFloat64&>(to).get_data().push_back(percent_rank);
+        assert_cast<ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                percent_rank);
+    }
+
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(to);
+        auto percent_rank = _cal_percent(data(place).rank, 
data(place).partition_size);
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = percent_rank;
+        }
     }
 
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
@@ -230,7 +275,7 @@ public:
 struct CumeDistData {
     int64_t numerator = 0;
     int64_t denominator = 0;
-    int64_t peer_group_start = 0;
+    int64_t peer_group_start = -1;
 };
 
 class WindowFunctionCumeDist final
@@ -272,7 +317,19 @@ public:
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
         auto cume_dist = (double)data(place).numerator * 1.0 / 
(double)data(place).denominator;
-        assert_cast<ColumnFloat64&>(to).get_data().push_back(cume_dist);
+        assert_cast<ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
+                cume_dist);
+    }
+
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnFloat64&, 
TypeCheckOnRelease::DISABLE>(to);
+        auto cume_dist = (double)data(place).numerator * 1.0 / 
(double)data(place).denominator;
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = cume_dist;
+        }
     }
 
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
@@ -323,10 +380,20 @@ public:
     void reset(AggregateDataPtr place) const override { 
WindowFunctionNTile::data(place).rows = 0; }
 
     void insert_result_into(ConstAggregateDataPtr place, IColumn& to) const 
override {
-        assert_cast<ColumnInt64&>(to).get_data().push_back(
+        assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to).get_data().push_back(
                 WindowFunctionNTile::data(place).bucket_index);
     }
 
+    bool result_column_could_resize() const override { return true; }
+
+    void insert_result_into_range(ConstAggregateDataPtr __restrict place, 
IColumn& to,
+                                  const size_t start, const size_t end) const 
override {
+        auto& column = assert_cast<ColumnInt64&, 
TypeCheckOnRelease::DISABLE>(to);
+        for (size_t i = start; i < end; ++i) {
+            column.get_data()[i] = 
WindowFunctionNTile::data(place).bucket_index;
+        }
+    }
+
     void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena*) 
const override {}
     void serialize(ConstAggregateDataPtr place, BufferWritable& buf) const 
override {}
     void deserialize(AggregateDataPtr place, BufferReadable& buf, Arena*) 
const override {}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1345cac66cb..423f3355f58 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1118,6 +1118,8 @@ struct TAnalyticNode {
   9: optional Exprs.TExpr order_by_eq
 
   10: optional bool is_colocate
+
+  11: optional list<Exprs.TExpr> range_between_offset_exprs
 }
 
 struct TMergeNode {
diff --git 
a/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
 
b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
new file mode 100644
index 00000000000..aef12dc42c5
Binary files /dev/null and 
b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out
 differ
diff --git 
a/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
new file mode 100644
index 00000000000..bfdf501d904
--- /dev/null
+++ 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_column_boundary") {
+    sql """ DROP TABLE IF EXISTS test_column_boundary """
+    sql """
+        CREATE TABLE IF NOT EXISTS test_column_boundary (
+            u_id int NULL COMMENT "",
+            u_city varchar(20) NULL COMMENT ""
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`u_id`, `u_city`)
+        DISTRIBUTED BY HASH(`u_id`, `u_city`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "in_memory" = "false",
+        "storage_format" = "V2"
+    );
+    """
+
+    sql """ insert into test_column_boundary select number, number + random() 
from numbers("number" = "1000000"); """
+    Integer count = 0;
+    Integer maxCount = 25;
+    while (count < maxCount) {
+        sql """ insert into test_column_boundary select number, number + 
random() from numbers("number" = "10000000"); """
+        count++
+        sleep(100);
+    }
+    sql """ set parallel_pipeline_task_num = 1; """
+
+    qt_sql_1 """ select count() from test_column_boundary; """ // 256000000 
rows
+    test {
+        // column size is too large
+        sql """ select sum(res) from (select count() over(partition by u_city) 
as res from test_column_boundary) as t; """
+        exception "string column length is too large"
+    }
+    sql """ DROP TABLE IF EXISTS test_column_boundary """
+}
+
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to