This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b9572f9de03 [pipelineX](fix) Fix pip scanner context bug (#29229)
b9572f9de03 is described below

commit b9572f9de038c29aa0e763f4ebcdc1395183ec55
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Dec 29 13:24:39 2023 +0800

    [pipelineX](fix) Fix pip scanner context bug (#29229)
---
 be/src/pipeline/exec/scan_operator.cpp     |   5 +-
 be/src/vec/exec/scan/pip_scanner_context.h | 118 +++++++++++++++++++++--------
 be/src/vec/exec/scan/scanner_context.cpp   |   3 +
 be/src/vec/exec/scan/scanner_context.h     |   2 +-
 4 files changed, 92 insertions(+), 36 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 0c0ccd42410..de3214b0465 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1239,10 +1239,9 @@ template <typename Derived>
 Status ScanLocalState<Derived>::_start_scanners(
         const std::list<vectorized::VScannerSPtr>& scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
-    _scanner_ctx = PipScannerContext::create_shared(
+    _scanner_ctx = PipXScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            state()->scan_queue_mem_limit(), p._col_distribute_ids, 1, 
_scan_dependency,
-            _finish_dependency);
+            state()->scan_queue_mem_limit(), _scan_dependency, 
_finish_dependency);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index fbf59fffab2..9a717ec08b2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -21,11 +21,9 @@
 #include "runtime/descriptors.h"
 #include "scanner_context.h"
 
-namespace doris {
+namespace doris::pipeline {
 
-namespace pipeline {
-
-class PipScannerContext : public vectorized::ScannerContext {
+class PipScannerContext final : public vectorized::ScannerContext {
     ENABLE_FACTORY_CREATOR(PipScannerContext);
 
 public:
@@ -41,19 +39,6 @@ public:
               _col_distribute_ids(col_distribute_ids),
               _need_colocate_distribute(!_col_distribute_ids.empty()) {}
 
-    PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
-                      const TupleDescriptor* output_tuple_desc,
-                      const RowDescriptor* output_row_descriptor,
-                      const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
-                      int64_t max_bytes_in_blocks_queue, const 
std::vector<int>& col_distribute_ids,
-                      const int num_parallel_instances,
-                      std::shared_ptr<pipeline::ScanDependency> dependency,
-                      std::shared_ptr<pipeline::Dependency> finish_dependency)
-            : vectorized::ScannerContext(state, output_tuple_desc, 
output_row_descriptor, scanners,
-                                         limit_, max_bytes_in_blocks_queue, 
num_parallel_instances,
-                                         local_state, dependency, 
finish_dependency),
-              _need_colocate_distribute(false) {}
-
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
                                 int id, bool wait = false) override {
         {
@@ -95,9 +80,6 @@ public:
 
             if (_blocks_queues[id].empty()) {
                 this->reschedule_scanner_ctx();
-                if (_dependency) {
-                    _dependency->block();
-                }
             }
         }
 
@@ -180,9 +162,6 @@ public:
                     for (int j = i; j < block_size; j += queue_size) {
                         
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
                     }
-                    if (_dependency) {
-                        _dependency->set_ready();
-                    }
                 }
                 _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
             }
@@ -232,9 +211,6 @@ public:
                     
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
                     _colocate_mutable_blocks[i]->clear();
                 }
-                if (_dependency) {
-                    _dependency->set_ready();
-                }
             }
         }
     }
@@ -248,7 +224,7 @@ public:
         return res;
     }
 
-private:
+protected:
     int _next_queue_to_feed = 0;
     std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
     std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
@@ -286,9 +262,6 @@ private:
                     std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
                     
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
                 }
-                if (_dependency) {
-                    _dependency->set_ready();
-                }
                 _colocate_blocks[loc] = get_free_block();
                 _colocate_mutable_blocks[loc]->set_mutable_columns(
                         _colocate_blocks[loc]->mutate_columns());
@@ -297,5 +270,86 @@ private:
     }
 };
 
-} // namespace pipeline
-} // namespace doris
+class PipXScannerContext final : public vectorized::ScannerContext {
+    ENABLE_FACTORY_CREATOR(PipXScannerContext);
+
+public:
+    PipXScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
+                       const TupleDescriptor* output_tuple_desc,
+                       const RowDescriptor* output_row_descriptor,
+                       const std::list<vectorized::VScannerSPtr>& scanners, 
int64_t limit_,
+                       int64_t max_bytes_in_blocks_queue,
+                       std::shared_ptr<pipeline::ScanDependency> dependency,
+                       std::shared_ptr<pipeline::Dependency> finish_dependency)
+            : vectorized::ScannerContext(state, output_tuple_desc, 
output_row_descriptor, scanners,
+                                         limit_, max_bytes_in_blocks_queue, 1, 
local_state,
+                                         dependency, finish_dependency) {}
+    Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
+                                int id, bool wait = false) override {
+        std::unique_lock l(_transfer_lock);
+        if (state->is_cancelled()) {
+            set_status_on_error(Status::Cancelled("cancelled"), false);
+        }
+
+        if (!status().ok()) {
+            return _process_status;
+        }
+
+        std::vector<vectorized::BlockUPtr> merge_blocks;
+        if (_blocks_queue.empty()) {
+            *eos = done();
+            return Status::OK();
+        }
+        if (_process_status.is<ErrorCode::CANCELLED>()) {
+            *eos = true;
+            return Status::OK();
+        }
+        *block = std::move(_blocks_queue.front());
+        _blocks_queue.pop_front();
+
+        auto rows = (*block)->rows();
+        while (!_blocks_queue.empty()) {
+            const auto add_rows = (*_blocks_queue.front()).rows();
+            if (rows + add_rows < state->batch_size()) {
+                rows += add_rows;
+                merge_blocks.emplace_back(std::move(_blocks_queue.front()));
+                _blocks_queue.pop_front();
+            } else {
+                break;
+            }
+        }
+
+        if (_blocks_queue.empty()) {
+            this->reschedule_scanner_ctx();
+            _dependency->block();
+        }
+
+        _cur_bytes_in_queue -= (*block)->allocated_bytes();
+        if (!merge_blocks.empty()) {
+            vectorized::MutableBlock m(block->get());
+            for (auto& merge_block : merge_blocks) {
+                _cur_bytes_in_queue -= merge_block->allocated_bytes();
+                static_cast<void>(m.merge(*merge_block));
+                return_free_block(std::move(merge_block));
+            }
+            (*block)->set_columns(std::move(m.mutable_columns()));
+        }
+
+        return Status::OK();
+    }
+
+    void reschedule_scanner_ctx() override {
+        if (done()) {
+            return;
+        }
+        auto state = _scanner_scheduler->submit(shared_from_this());
+        //todo(wb) rethinking is it better to mark current scan_context failed 
when submit failed many times?
+        if (state.ok()) {
+            _num_scheduling_ctx++;
+        } else {
+            set_status_on_error(state, false);
+        }
+    }
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 954c294574f..16bb1ce8487 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -259,6 +259,9 @@ void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
         _blocks_queue.push_back(std::move(b));
     }
     blocks.clear();
+    if (_dependency) {
+        _dependency->set_ready();
+    }
     _blocks_queue_added_cv.notify_one();
     _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
 }
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 6a3e8553f8f..035d396bf65 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -170,7 +170,7 @@ public:
 
     SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
-    void reschedule_scanner_ctx();
+    virtual void reschedule_scanner_ctx();
 
     // the unique id of this context
     std::string ctx_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to