This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 59acf61ec5 [pipelineX](pick) pick 2 PR from pipeline engine (#23463)
59acf61ec5 is described below

commit 59acf61ec5d487ccb98498174fe922318ba6ab4b
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Aug 25 13:26:05 2023 +0800

    [pipelineX](pick) pick 2 PR from pipeline engine (#23463)
---
 be/src/pipeline/pipeline_x/dependency.cpp      |  2 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 10 +++++-----
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 49262ce0e5..fab4db32f9 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -181,7 +181,7 @@ vectorized::BlockRowPos 
AnalyticDependency::compare_row_to_find_end(int idx,
     }
     //binary search, set start and end pos
     int64_t start_pos = start_init_row_num;
-    int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows() - 1;
+    int64_t end_pos = _analytic_state.input_blocks[start.block_num].rows();
     //if end_block_num haven't moved, only start_block_num go to the end block
     //so could use the end.row_num for binary search
     if (start.block_num == end.block_num) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index d6d24d4e09..676414fce5 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -142,23 +142,23 @@ Status PipelineXTask::execute(bool* eos) {
             set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
             return Status::OK();
         }
-        if (!_source->can_read(_state)) {
+        if (!source_can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             return Status::OK();
         }
-        if (!_sink->can_write(_state)) {
+        if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             return Status::OK();
         }
     }
 
-    this->set_begin_execute_time();
+    set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
-        if (_data_state != SourceState::MORE_DATA && 
!_source->can_read(_state)) {
+        if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             break;
         }
-        if (!_sink->can_write(_state)) {
+        if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             break;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to