Gabriel39 commented on code in PR #23639: URL: https://github.com/apache/doris/pull/23639#discussion_r1309560444
########## be/src/pipeline/pipeline.h: ########## @@ -118,6 +147,34 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> { DataSinkOperatorXPtr _sink_x; std::shared_ptr<ObjectPool> _obj_pool; + + Operators _old_operators; Review Comment: use `_operators` for Operators and `_operator_xs` for OperatorXs ########## be/src/pipeline/pipeline.h: ########## @@ -45,31 +45,60 @@ using PipelineId = uint32_t; class Pipeline : public std::enable_shared_from_this<Pipeline> { friend class PipelineTask; + friend class PipelineXTask; public: Pipeline() = delete; explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context) - : _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) { + : _pipeline_id(pipeline_id), _context(context) { _init_profile(); } void add_dependency(std::shared_ptr<Pipeline>& pipeline) { - pipeline->_parents.push_back(weak_from_this()); - _dependencies.push_back(pipeline); + pipeline->_parents.push_back({_operator_builders.size(), weak_from_this()}); + _dependencies.push_back({_operator_builders.size(), pipeline}); } // If all dependencies are finished, this pipeline task should be scheduled. // e.g. Hash join probe task will be scheduled once Hash join build task is finished. - bool finish_one_dependency(int dependency_core_id) { - DCHECK(_complete_dependency < _dependencies.size()); - bool finish = _complete_dependency.fetch_add(1) == _dependencies.size() - 1; - if (finish) { + void finish_one_dependency(int dep_opr, int dependency_core_id) { + std::lock_guard l(_depend_mutex); + + int i; + auto dispose_short_circuit = [&](auto& operators) { + _always_can_read = true; + _always_can_write = (dep_opr == operators.size()); + + for (i = 0; i < _dependencies.size(); ++i) { + if (dep_opr == _dependencies[i].first) { + _dependencies.erase(_dependencies.begin(), _dependencies.begin() + i + 1); + break; + } + } + }; + + if (!_old_operators.empty() && _old_operators[dep_opr - 1]->can_terminate_early()) { + dispose_short_circuit(_old_operators); + } else if (!_operators.empty() && _operators[dep_opr - 1]->can_terminate_early()) { Review Comment: Do not need to judge _operators because pipelineX doesn't use this logic any more -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org