This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 59acf61ec5 [pipelineX](pick) pick 2 PR from pipeline engine (#23463) 59acf61ec5 is described below commit 59acf61ec5d487ccb98498174fe922318ba6ab4b Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Aug 25 13:26:05 2023 +0800 [pipelineX](pick) pick 2 PR from pipeline engine (#23463) --- be/src/pipeline/pipeline_x/dependency.cpp | 2 +- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/pipeline_x/dependency.cpp b/be/src/pipeline/pipeline_x/dependency.cpp index 49262ce0e5..fab4db32f9 100644 --- a/be/src/pipeline/pipeline_x/dependency.cpp +++ b/be/src/pipeline/pipeline_x/dependency.cpp @@ -181,7 +181,7 @@ vectorized::BlockRowPos AnalyticDependency::compare_row_to_find_end(int idx, } //binary search, set start and end pos int64_t start_pos = start_init_row_num; - int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows() - 1; + int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows(); //if end_block_num haven't moved, only start_block_num go to the end block //so could use the end.row_num for binary search if (start.block_num == end.block_num) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index d6d24d4e09..676414fce5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -142,23 +142,23 @@ Status PipelineXTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } - if (!_source->can_read(_state)) { + if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - if (!_sink->can_write(_state)) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); return Status::OK(); } } - this->set_begin_execute_time(); + set_begin_execute_time(); while (!_fragment_context->is_canceled()) { - if (_data_state != SourceState::MORE_DATA && !_source->can_read(_state)) { + if (_data_state != SourceState::MORE_DATA && !source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } - if (!_sink->can_write(_state)) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org