HappenLee commented on code in PR #46181: URL: https://github.com/apache/doris/pull/46181#discussion_r1913037509
########## be/src/pipeline/exec/analytic_sink_operator.cpp: ########## @@ -266,73 +714,133 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - local_state._shared_state->input_eos = eos; - if (local_state._shared_state->input_eos && input_block->rows() == 0) { - local_state._dependency->set_ready_to_read(); - local_state._dependency->block(); - return Status::OK(); + local_state._input_eos = eos; + local_state._remove_unused_rows(); + RETURN_IF_ERROR(_add_input_block(state, input_block)); + RETURN_IF_ERROR(local_state._execute_impl()); + if (local_state._input_eos) { + std::unique_lock<std::mutex> lc(local_state._shared_state->sink_eos_lock); + local_state._shared_state->sink_eos = true; + local_state._dependency->set_ready_to_read(); // ready for source to read } + return Status::OK(); +} - local_state._shared_state->input_block_first_row_positions.emplace_back( - local_state._shared_state->input_total_rows); +Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state, + vectorized::Block* input_block) { + if (input_block->rows() <= 0) { + return Status::OK(); + } + auto& local_state = get_local_state(state); + local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows); size_t block_rows = input_block->rows(); - local_state._shared_state->input_total_rows += block_rows; - local_state._shared_state->all_block_end.block_num = - local_state._shared_state->input_blocks.size(); - local_state._shared_state->all_block_end.row_num = block_rows; - local_state._shared_state->all_block_end.pos = local_state._shared_state->input_total_rows; - - if (local_state._shared_state->origin_cols - .empty()) { //record origin columns, maybe be after this, could cast some column but no need to save + local_state._input_total_rows += block_rows; + + // record origin columns, maybe be after this, could cast some column but no need to output + if (local_state._input_col_ids.empty()) { for (int c = 0; c < input_block->columns(); ++c) { - local_state._shared_state->origin_cols.emplace_back(c); + local_state._input_col_ids.emplace_back(c); } } { SCOPED_TIMER(local_state._compute_agg_data_timer); - for (size_t i = 0; i < _agg_functions_size; - ++i) { //insert _agg_input_columns, execute calculate for its + //insert _agg_input_columns, execute calculate for its, and those columns maybe could remove have used data + for (size_t i = 0; i < _agg_functions_size; ++i) { for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) { - RETURN_IF_ERROR(_insert_range_column( - input_block, local_state._agg_expr_ctxs[i][j], - local_state._shared_state->agg_input_columns[i][j].get(), block_rows)); + RETURN_IF_ERROR(_insert_range_column(input_block, local_state._agg_expr_ctxs[i][j], + local_state._agg_input_columns[i][j].get(), + block_rows)); } } } { SCOPED_TIMER(local_state._compute_partition_by_timer); for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; - RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block, - &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->partition_by_column_idxs[i] = result_col_id; + RETURN_IF_ERROR( + _insert_range_column(input_block, local_state._partition_by_eq_expr_ctxs[i], + local_state._partition_by_columns[i].get(), block_rows)); } } { SCOPED_TIMER(local_state._compute_order_by_timer); for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; - RETURN_IF_ERROR( - local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->ordey_by_column_idxs[i] = result_col_id; + RETURN_IF_ERROR(_insert_range_column(input_block, local_state._order_by_eq_expr_ctxs[i], + local_state._order_by_columns[i].get(), + block_rows)); + } + } + + { + SCOPED_TIMER(local_state._compute_order_by_function_timer); + // should change the order by exprs to range column, IF FE have support range window + for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) { + RETURN_IF_ERROR(_insert_range_column(input_block, local_state._order_by_eq_expr_ctxs[i], + local_state._range_result_columns[i].get(), + block_rows)); } } Review Comment: remove the unless column in here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org