HappenLee commented on code in PR #46181:
URL: https://github.com/apache/doris/pull/46181#discussion_r1913037509


##########
be/src/pipeline/exec/analytic_sink_operator.cpp:
##########
@@ -266,73 +714,133 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Block
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
-    local_state._shared_state->input_eos = eos;
-    if (local_state._shared_state->input_eos && input_block->rows() == 0) {
-        local_state._dependency->set_ready_to_read();
-        local_state._dependency->block();
-        return Status::OK();
+    local_state._input_eos = eos;
+    local_state._remove_unused_rows();
+    RETURN_IF_ERROR(_add_input_block(state, input_block));
+    RETURN_IF_ERROR(local_state._execute_impl());
+    if (local_state._input_eos) {
+        std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
+        local_state._shared_state->sink_eos = true;
+        local_state._dependency->set_ready_to_read(); // ready for source to 
read
     }
+    return Status::OK();
+}
 
-    local_state._shared_state->input_block_first_row_positions.emplace_back(
-            local_state._shared_state->input_total_rows);
+Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state,
+                                               vectorized::Block* input_block) 
{
+    if (input_block->rows() <= 0) {
+        return Status::OK();
+    }
+    auto& local_state = get_local_state(state);
+    
local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows);
     size_t block_rows = input_block->rows();
-    local_state._shared_state->input_total_rows += block_rows;
-    local_state._shared_state->all_block_end.block_num =
-            local_state._shared_state->input_blocks.size();
-    local_state._shared_state->all_block_end.row_num = block_rows;
-    local_state._shared_state->all_block_end.pos = 
local_state._shared_state->input_total_rows;
-
-    if (local_state._shared_state->origin_cols
-                .empty()) { //record origin columns, maybe be after this, 
could cast some column but no need to save
+    local_state._input_total_rows += block_rows;
+
+    // record origin columns, maybe be after this, could cast some column but 
no need to output
+    if (local_state._input_col_ids.empty()) {
         for (int c = 0; c < input_block->columns(); ++c) {
-            local_state._shared_state->origin_cols.emplace_back(c);
+            local_state._input_col_ids.emplace_back(c);
         }
     }
 
     {
         SCOPED_TIMER(local_state._compute_agg_data_timer);
-        for (size_t i = 0; i < _agg_functions_size;
-             ++i) { //insert _agg_input_columns, execute calculate for its
+        //insert _agg_input_columns, execute calculate for its, and those 
columns maybe could remove have used data
+        for (size_t i = 0; i < _agg_functions_size; ++i) {
             for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) {
-                RETURN_IF_ERROR(_insert_range_column(
-                        input_block, local_state._agg_expr_ctxs[i][j],
-                        
local_state._shared_state->agg_input_columns[i][j].get(), block_rows));
+                RETURN_IF_ERROR(_insert_range_column(input_block, 
local_state._agg_expr_ctxs[i][j],
+                                                     
local_state._agg_input_columns[i][j].get(),
+                                                     block_rows));
             }
         }
     }
     {
         SCOPED_TIMER(local_state._compute_partition_by_timer);
         for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size(); 
++i) {
-            int result_col_id = -1;
-            
RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block,
-                                                                               
&result_col_id));
-            DCHECK_GE(result_col_id, 0);
-            local_state._shared_state->partition_by_column_idxs[i] = 
result_col_id;
+            RETURN_IF_ERROR(
+                    _insert_range_column(input_block, 
local_state._partition_by_eq_expr_ctxs[i],
+                                         
local_state._partition_by_columns[i].get(), block_rows));
         }
     }
 
     {
         SCOPED_TIMER(local_state._compute_order_by_timer);
         for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) 
{
-            int result_col_id = -1;
-            RETURN_IF_ERROR(
-                    
local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
-            DCHECK_GE(result_col_id, 0);
-            local_state._shared_state->ordey_by_column_idxs[i] = result_col_id;
+            RETURN_IF_ERROR(_insert_range_column(input_block, 
local_state._order_by_eq_expr_ctxs[i],
+                                                 
local_state._order_by_columns[i].get(),
+                                                 block_rows));
+        }
+    }
+
+    {
+        SCOPED_TIMER(local_state._compute_order_by_function_timer);
+        // should change the order by exprs to range column, IF FE have 
support range window
+        for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) 
{
+            RETURN_IF_ERROR(_insert_range_column(input_block, 
local_state._order_by_eq_expr_ctxs[i],
+                                                 
local_state._range_result_columns[i].get(),
+                                                 block_rows));
         }
     }
 

Review Comment:
   remove the unless column in here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to