This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 90669039726 [pick](branch-3.0) pick #41292 #41589 #41628 #41743 #41845 
#41857 #41902 #41601 #41667 #41751 (#42031)
90669039726 is described below

commit 906690397265dc285f726b1444c36f84d7e383b4
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Oct 17 19:21:19 2024 +0800

    [pick](branch-3.0) pick #41292 #41589 #41628 #41743 #41845 #41857 #41902 
#41601 #41667 #41751 (#42031)
    
    pick #41292 #41589 #41628 #41743 #41845 #41857 #41902 #41601 #41667
    #41751
    
    Co-authored-by: Pxl <pxl...@qq.com>
---
 be/src/exprs/bloom_filter_func.h                   |  4 +--
 be/src/exprs/runtime_filter.cpp                    | 13 ++++---
 be/src/exprs/runtime_filter.h                      |  7 ----
 be/src/exprs/runtime_filter_slots.h                | 12 ++++---
 be/src/pipeline/common/runtime_filter_consumer.cpp |  1 -
 be/src/pipeline/common/runtime_filter_consumer.h   |  1 -
 be/src/pipeline/exec/analytic_sink_operator.h      |  5 ++-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 42 ++++++++++++++--------
 be/src/pipeline/exec/multi_cast_data_stream_sink.h | 15 +++++---
 be/src/pipeline/exec/operator.h                    |  3 ++
 .../exec/partitioned_aggregation_sink_operator.h   |  1 -
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  1 -
 .../local_exchange_sink_operator.cpp               | 36 +++++++------------
 .../local_exchange/local_exchange_sink_operator.h  |  3 +-
 be/src/pipeline/pipeline.cpp                       | 13 ++++++-
 be/src/pipeline/pipeline.h                         | 17 ++++++++-
 be/src/pipeline/pipeline_fragment_context.cpp      | 19 +++++++---
 be/src/pipeline/pipeline_fragment_context.h        |  3 +-
 be/src/pipeline/pipeline_task.cpp                  | 30 +++++++++++++---
 be/src/pipeline/pipeline_task.h                    | 14 +++++---
 be/src/pipeline/task_scheduler.cpp                 |  2 +-
 be/src/runtime/fragment_mgr.cpp                    | 14 +++++---
 be/src/runtime/runtime_filter_mgr.cpp              |  4 ++-
 be/src/vec/runtime/shared_hash_table_controller.h  |  1 +
 be/test/exprs/runtime_filter_test.cpp              |  5 ---
 25 files changed, 167 insertions(+), 99 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 95d50642448..ae206615bd7 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -157,19 +157,19 @@ public:
     }
 
     Status merge(BloomFilterFuncBase* bloomfilter_func) {
+        DCHECK(bloomfilter_func != nullptr);
+        DCHECK(bloomfilter_func->_bloom_filter != nullptr);
         // If `_inited` is false, there is no memory allocated in bloom filter 
and this is the first
         // call for `merge` function. So we just reuse this bloom filter, and 
we don't need to
         // allocate memory again.
         if (!_inited) {
             auto* other_func = 
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
             DCHECK(_bloom_filter == nullptr);
-            DCHECK(bloomfilter_func != nullptr);
             _bloom_filter = bloomfilter_func->_bloom_filter;
             _bloom_filter_alloced = other_func->_bloom_filter_alloced;
             _inited = true;
             return Status::OK();
         }
-        DCHECK(bloomfilter_func != nullptr);
         auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
         if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
             return Status::InternalError(
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c3404a15355..b9bf837cf56 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -472,10 +472,10 @@ public:
                           const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {
-        if (is_ignored() || wrapper->is_ignored()) {
-            _context->ignored = true;
+        if (wrapper->is_ignored()) {
             return Status::OK();
         }
+        _context->ignored = false;
 
         bool can_not_merge_in_or_bloom =
                 _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -493,7 +493,10 @@ public:
 
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
-            // try insert set
+            if (!_context->hybrid_set) {
+                _context->ignored = true;
+                return Status::OK();
+            }
             _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
             if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
                 _context->ignored = true;
@@ -1307,10 +1310,6 @@ std::string IRuntimeFilter::formatted_state() const {
             _wrapper->_context->ignored);
 }
 
-BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
-    return _wrapper->get_bloomfilter();
-}
-
 Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const 
TQueryOptions* options,
                                       int node_id, bool build_bf_exactly) {
     // if node_id == -1 , it shouldn't be a consumer
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f199e173e84..629e5fa2550 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -197,7 +197,6 @@ public:
               _is_broadcast_join(true),
               _has_remote_target(false),
               _has_local_target(false),
-              _rf_state(RuntimeFilterState::NOT_READY),
               _rf_state_atomic(RuntimeFilterState::NOT_READY),
               _role(RuntimeFilterRole::PRODUCER),
               _expr_order(-1),
@@ -263,8 +262,6 @@ public:
     Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* 
options,
                           int node_id = -1, bool build_bf_exactly = false);
 
-    BloomFilterFuncBase* get_bloomfilter() const;
-
     // serialize _wrapper to protobuf
     Status serialize(PMergeFilterRequest* request, void** data, int* len);
     Status serialize(PPublishFilterRequest* request, void** data = nullptr, 
int* len = nullptr);
@@ -365,9 +362,6 @@ protected:
     void to_protobuf(PInFilter* filter);
     void to_protobuf(PMinMaxFilter* filter);
 
-    template <class T>
-    Status _update_filter(const T* param);
-
     template <class T>
     Status serialize_impl(T* request, void** data, int* len);
 
@@ -397,7 +391,6 @@ protected:
     // will apply to local node
     bool _has_local_target;
     // filter is ready for consumer
-    RuntimeFilterState _rf_state;
     std::atomic<RuntimeFilterState> _rf_state_atomic;
     // role consumer or producer
     RuntimeFilterRole _role;
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index c0a249cd6b0..d330d327149 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -98,12 +98,16 @@ public:
         return Status::OK();
     }
 
+    Status ignore_all_filters() {
+        for (auto filter : _runtime_filters) {
+            filter->set_ignored();
+        }
+        return Status::OK();
+    }
+
     Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
         // process IN_OR_BLOOM_FILTER's real type
         for (auto filter : _runtime_filters) {
-            if (filter->get_ignored()) {
-                continue;
-            }
             if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                 get_real_size(filter.get(), local_hash_table_size) >
                         state->runtime_filter_max_in_num()) {
@@ -141,7 +145,7 @@ public:
     }
 
     // publish runtime filter
-    Status publish(bool publish_local = false) {
+    Status publish(bool publish_local) {
         for (auto& pair : _runtime_filters_map) {
             for (auto& filter : pair.second) {
                 RETURN_IF_ERROR(filter->publish(publish_local));
diff --git a/be/src/pipeline/common/runtime_filter_consumer.cpp 
b/be/src/pipeline/common/runtime_filter_consumer.cpp
index 817c76a79af..29279824964 100644
--- a/be/src/pipeline/common/runtime_filter_consumer.cpp
+++ b/be/src/pipeline/common/runtime_filter_consumer.cpp
@@ -76,7 +76,6 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
         auto runtime_filter = _runtime_filter_ctxs[i].runtime_filter;
         runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
                 id, node_id, name, runtime_filter.get());
-        _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
         runtime_filter_timers[i] = 
std::make_shared<pipeline::RuntimeFilterTimer>(
                 runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
                 runtime_filter_dependencies[i]);
diff --git a/be/src/pipeline/common/runtime_filter_consumer.h 
b/be/src/pipeline/common/runtime_filter_consumer.h
index 03868355875..c1e5ea91bc8 100644
--- a/be/src/pipeline/common/runtime_filter_consumer.h
+++ b/be/src/pipeline/common/runtime_filter_consumer.h
@@ -61,7 +61,6 @@ protected:
         // set to true if this runtime filter is already applied to 
vconjunct_ctx_ptr
         bool apply_mark = false;
         std::shared_ptr<IRuntimeFilter> runtime_filter;
-        pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
     };
 
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 8f7a9d1c35a..ee305a877f5 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -80,17 +80,16 @@ public:
     DataDistribution required_data_distribution() const override {
         if (_partition_by_eq_expr_ctxs.empty()) {
             return {ExchangeType::PASSTHROUGH};
-        } else if (_order_by_eq_expr_ctxs.empty()) {
+        } else {
             return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
-        return 
DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
     }
 
     bool require_data_distribution() const override { return true; }
     bool require_shuffled_data_distribution() const override {
-        return !_partition_by_eq_expr_ctxs.empty() && 
_order_by_eq_expr_ctxs.empty();
+        return !_partition_by_eq_expr_ctxs.empty();
     }
 
 private:
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8f7b176a979..5be3fcad112 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -22,6 +22,7 @@
 #include "exprs/bloom_filter_func.h"
 #include "pipeline/exec/hashjoin_probe_operator.h"
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/utils/template_helpers.hpp"
 
@@ -111,6 +112,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 }
 
 Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
     auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
     Defer defer {[&]() {
         if (_should_build_hash_table && p._shared_hashtable_controller) {
@@ -119,25 +123,30 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     }};
 
     if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled()) {
-        return Status::OK();
+        return Base::close(state, exec_status);
     }
-    auto* block = _shared_state->build_block.get();
-    uint64_t hash_table_size = block ? block->rows() : 0;
-    {
-        SCOPED_TIMER(_runtime_filter_init_timer);
-        if (_should_build_hash_table) {
-            RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+
+    if (state->get_task()->wake_up_by_downstream()) {
+        RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
+        RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
+    } else {
+        auto* block = _shared_state->build_block.get();
+        uint64_t hash_table_size = block ? block->rows() : 0;
+        {
+            SCOPED_TIMER(_runtime_filter_init_timer);
+            if (_should_build_hash_table) {
+                RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+            }
+            RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+        }
+        if (_should_build_hash_table && hash_table_size > 1) {
+            SCOPED_TIMER(_runtime_filter_compute_timer);
+            _runtime_filter_slots->insert(block);
         }
-        RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
-    }
-    if (_should_build_hash_table && hash_table_size > 1) {
-        SCOPED_TIMER(_runtime_filter_compute_timer);
-        _runtime_filter_slots->insert(block);
     }
-
     SCOPED_TIMER(_publish_runtime_filter_timer);
     RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
-    return Status::OK();
+    return Base::close(state, exec_status);
 }
 
 bool HashJoinBuildSinkLocalState::build_unique() const {
@@ -504,6 +513,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
+    local_state._eos = eos;
     if (local_state._should_build_hash_table) {
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from probe side.
@@ -555,6 +565,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
+            _shared_hash_table_context->complete_build_stage = true;
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = 
local_state._shared_state->arena;
             _shared_hash_table_context->hash_table_variants =
@@ -567,7 +578,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
             _shared_hashtable_controller->signal(node_id());
         }
-    } else if (!local_state._should_build_hash_table) {
+    } else if (!local_state._should_build_hash_table &&
+               _shared_hash_table_context->complete_build_stage) {
         DCHECK(_shared_hashtable_controller != nullptr);
         DCHECK(_shared_hash_table_context != nullptr);
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index 1a9787789dd..57b5974064b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -42,15 +42,15 @@ class MultiCastDataStreamSinkOperatorX final
     using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
 
 public:
-    MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
-                                     const int cast_sender_count, ObjectPool* 
pool,
+    MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, 
ObjectPool* pool,
                                      const TMultiCastDataStreamSink& sink,
                                      const RowDescriptor& row_desc)
             : Base(sink_id, -1, sources),
               _pool(pool),
               _row_desc(row_desc),
-              _cast_sender_count(cast_sender_count),
-              _sink(sink) {}
+              _cast_sender_count(sources.size()),
+              _sink(sink),
+              _num_dests(sources.size()) {}
     ~MultiCastDataStreamSinkOperatorX() override = default;
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
@@ -60,14 +60,19 @@ public:
     std::shared_ptr<BasicSharedState> create_shared_state() const override;
 
     const TMultiCastDataStreamSink& sink_node() { return _sink; }
+    bool count_down_destination() override {
+        DCHECK_GT(_num_dests, 0);
+        return _num_dests.fetch_sub(1) == 1;
+    }
 
 private:
     friend class MultiCastDataStreamSinkLocalState;
     ObjectPool* _pool;
     RowDescriptor _row_desc;
-    const int _cast_sender_count;
+    const size_t _cast_sender_count;
     const TMultiCastDataStreamSink& _sink;
     friend class MultiCastDataStreamSinkLocalState;
+    std::atomic<size_t> _num_dests;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0d0b0c05b36..b848aea6e1e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -359,6 +359,7 @@ protected:
     // Set to true after close() has been called. subclasses should check and 
set this in
     // close().
     bool _closed = false;
+    std::atomic<bool> _eos = false;
     //NOTICE: now add a faker profile, because sometimes the profile record is 
useless
     //so we want remove some counters and timers, eg: in join node, if it's 
broadcast_join
     //and shared hash table, some counter/timer about build hash table is 
useless,
@@ -514,6 +515,8 @@ public:
 
     virtual bool should_dry_run(RuntimeState* state) { return false; }
 
+    [[nodiscard]] virtual bool count_down_destination() { return true; }
+
 protected:
     template <typename Writer, typename Parent>
         requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 259d7580877..6b3a74c83df 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -259,7 +259,6 @@ public:
 
     std::unique_ptr<RuntimeState> _runtime_state;
 
-    bool _eos = false;
     std::shared_ptr<Dependency> _finish_dependency;
 
     // temp structures during spilling
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index e74b5d2a414..2c820d9fa09 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -54,7 +54,6 @@ private:
 
     RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
 
-    bool _eos = false;
     vectorized::SpillStreamSPtr _spilling_stream;
     std::shared_ptr<Dependency> _finish_dependency;
 };
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 19c37f3649b..d87113ca80a 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -41,6 +41,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, 
const int num_buckets
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
     if (_type == ExchangeType::HASH_SHUFFLE) {
+        _use_global_shuffle = should_disable_bucket_shuffle;
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
@@ -84,6 +85,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     SCOPED_TIMER(_init_timer);
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+    if (_parent->cast<LocalExchangeSinkOperatorX>()._type == 
ExchangeType::HASH_SHUFFLE) {
+        _profile->add_info_string(
+                "UseGlobalShuffle",
+                
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
+    }
     _channel_id = info.task_idx;
     return Status::OK();
 }
@@ -104,30 +110,16 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
-Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
-    if (_closed) {
-        return Status::OK();
-    }
-    RETURN_IF_ERROR(Base::close(state, exec_status));
-    if (exec_status.ok()) {
-        DCHECK(_release_count) << "Do not finish correctly! " << 
debug_string(0)
-                               << " state: { cancel = " << 
state->is_cancelled() << ", "
-                               << state->cancel_reason().to_string() << "} 
query ctx: { cancel = "
-                               << state->get_query_ctx()->is_cancelled() << ", 
"
-                               << 
state->get_query_ctx()->exec_status().to_string() << "}";
-    }
-    return Status::OK();
-}
-
 std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) 
const {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
-                   "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _running_source_operators: 
{}, _release_count: {}",
-                   Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
-                   _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
-                   _release_count);
+                   "{}, _use_global_shuffle: {}, _channel_id: {}, 
_num_partitions: {}, "
+                   "_num_senders: {}, _num_sources: {}, "
+                   "_running_sink_operators: {}, _running_source_operators: 
{}",
+                   Base::debug_string(indentation_level),
+                   
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
+                   _exchanger->_num_partitions, _exchanger->_num_senders, 
_exchanger->_num_sources,
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -140,13 +132,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
     // If all exchange sources ended due to limit reached, current task should 
also finish
     if (local_state._exchanger->_running_source_operators == 0) {
-        local_state._release_count = true;
         local_state._shared_state->sub_running_sink_operators();
         return Status::EndOfFile("receiver eof");
     }
     if (eos) {
         local_state._shared_state->sub_running_sink_operators();
-        local_state._release_count = true;
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 7a98840b4b3..1cd9736d429 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -43,7 +43,6 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
-    Status close(RuntimeState* state, Status exec_status) override;
     std::string debug_string(int indentation_level) const override;
     std::vector<Dependency*> dependencies() const override;
 
@@ -69,7 +68,6 @@ private:
 
     // Used by random passthrough exchanger
     int _channel_id = 0;
-    bool _release_count = false;
 };
 
 // A single 32-bit division on a recent x64 processor has a throughput of one 
instruction every six cycles with a latency of 26 cycles.
@@ -118,6 +116,7 @@ private:
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
     const std::map<int, int> _bucket_seq_to_instance_idx;
     std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
+    bool _use_global_shuffle = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 74e15d7cc93..6e83c7805e4 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
 
 namespace doris::pipeline {
 
@@ -65,4 +66,14 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
     return Status::OK();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+void Pipeline::make_all_runnable() {
+    if (_sink->count_down_destination()) {
+        for (auto* task : _tasks) {
+            if (task) {
+                task->clear_blocking_state(true);
+            }
+        }
+    }
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index dfeb53ae006..8a20ccb631c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -47,6 +47,7 @@ public:
                       std::weak_ptr<PipelineFragmentContext> context)
             : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
         _init_profile();
+        _tasks.resize(_num_tasks, nullptr);
     }
 
     // Add operators for pipelineX
@@ -104,14 +105,24 @@ public:
     void set_children(std::shared_ptr<Pipeline> child) { 
_children.push_back(child); }
     void set_children(std::vector<std::shared_ptr<Pipeline>> children) { 
_children = children; }
 
-    void incr_created_tasks() { _num_tasks_created++; }
+    void incr_created_tasks(int i, PipelineTask* task) {
+        _num_tasks_created++;
+        _num_tasks_running++;
+        DCHECK_LT(i, _tasks.size());
+        _tasks[i] = task;
+    }
+
+    void make_all_runnable();
+
     void set_num_tasks(int num_tasks) {
         _num_tasks = num_tasks;
+        _tasks.resize(_num_tasks, nullptr);
         for (auto& op : _operators) {
             op->set_parallel_tasks(_num_tasks);
         }
     }
     int num_tasks() const { return _num_tasks; }
+    bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
 
     std::string debug_string() {
         fmt::memory_buffer debug_string_buffer;
@@ -158,6 +169,10 @@ private:
     int _num_tasks = 1;
     // How many tasks are already created?
     std::atomic<int> _num_tasks_created = 0;
+    // How many tasks are already created and not finished?
+    std::atomic<int> _num_tasks_running = 0;
+    // Tasks in this pipeline.
+    std::vector<PipelineTask*> _tasks;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7538e580c3e..8c998ab8c2f 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -142,6 +142,8 @@ PipelineFragmentContext::~PipelineFragmentContext() {
             runtime_state.reset();
         }
     }
+    _dag.clear();
+    _pip_id_to_pipeline.clear();
     _pipelines.clear();
     _sink.reset();
     _root_op.reset();
@@ -368,6 +370,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
     _task_runtime_states.resize(_pipelines.size());
     for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
         _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
+        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = 
_pipelines[pip_idx].get();
     }
     auto pipeline_id_to_profile = 
_runtime_state->build_pipeline_profile(_pipelines.size());
 
@@ -473,6 +476,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
                                                            
task_runtime_state.get(), this,
                                                            
pipeline_id_to_profile[pip_idx].get(),
                                                            
get_local_exchange_state(pipeline), i);
+                pipeline->incr_created_tasks(i, task.get());
                 task_runtime_state->set_task(task.get());
                 pipeline_id_to_task.insert({pipeline->id(), task.get()});
                 _tasks[i].emplace_back(std::move(task));
@@ -573,7 +577,6 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
         }
     }
     _pipeline_parent_map.clear();
-    _dag.clear();
     _op_id_to_le_state.clear();
 
     return Status::OK();
@@ -1151,8 +1154,7 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
         }
 
         _sink.reset(new MultiCastDataStreamSinkOperatorX(
-                sink_id, sources, 
thrift_sink.multi_cast_stream_sink.sinks.size(), pool,
-                thrift_sink.multi_cast_stream_sink, row_desc));
+                sink_id, sources, pool, thrift_sink.multi_cast_stream_sink, 
row_desc));
         for (int i = 0; i < sender_size; ++i) {
             auto new_pipeline = add_pipeline();
             RowDescriptor* _row_desc = nullptr;
@@ -1711,7 +1713,16 @@ void PipelineFragmentContext::_close_fragment_instance() 
{
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
-void PipelineFragmentContext::close_a_pipeline() {
+void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
+    // If all tasks of this pipeline has been closed, upstream tasks is never 
needed, and we just make those runnable here
+    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
+    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
+        if (_dag.contains(pipeline_id)) {
+            for (auto dep : _dag[pipeline_id]) {
+                _pip_id_to_pipeline[dep]->make_all_runnable();
+            }
+        }
+    }
     std::lock_guard<std::mutex> l(_task_mutex);
     g_pipeline_tasks_count << -1;
     ++_closed_tasks;
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f95eb03fb12..b20c324756c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -102,7 +102,7 @@ public:
 
     [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
 
-    void close_a_pipeline();
+    void close_a_pipeline(PipelineId pipeline_id);
 
     Status send_report(bool);
 
@@ -295,6 +295,7 @@ private:
     std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, 
std::shared_ptr<Dependency>>>
             _op_id_to_le_state;
 
+    std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
     // UniqueId -> runtime mgr
     std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> 
_runtime_filter_mgr_map;
 
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4f362ac5042..e06b8028c9c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -71,7 +71,6 @@ PipelineTask::PipelineTask(
     if (shared_state) {
         _sink_shared_state = shared_state;
     }
-    pipeline->incr_created_tasks();
 }
 
 Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, 
const TDataSink& tsink,
@@ -228,6 +227,9 @@ bool PipelineTask::_wait_to_start() {
     _blocked_dep = _execution_dep->is_blocked_by(this);
     if (_blocked_dep != nullptr) {
         static_cast<Dependency*>(_blocked_dep)->start_watcher();
+        if (_wake_up_by_downstream) {
+            _eos = true;
+        }
         return true;
     }
 
@@ -235,6 +237,9 @@ bool PipelineTask::_wait_to_start() {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
+            if (_wake_up_by_downstream) {
+                _eos = true;
+            }
             return true;
         }
     }
@@ -250,6 +255,9 @@ bool PipelineTask::_is_blocked() {
                 _blocked_dep = dep->is_blocked_by(this);
                 if (_blocked_dep != nullptr) {
                     _blocked_dep->start_watcher();
+                    if (_wake_up_by_downstream) {
+                        _eos = true;
+                    }
                     return true;
                 }
             }
@@ -269,6 +277,9 @@ bool PipelineTask::_is_blocked() {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
+            if (_wake_up_by_downstream) {
+                _eos = true;
+            }
             return true;
         }
     }
@@ -279,7 +290,7 @@ Status PipelineTask::execute(bool* eos) {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
     SCOPED_ATTACH_TASK(_state);
-    _eos = _sink->is_finished(_state) || _eos;
+    _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
     *eos = _eos;
     if (_eos) {
         // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
@@ -307,6 +318,11 @@ Status PipelineTask::execute(bool* eos) {
     if (_wait_to_start()) {
         return Status::OK();
     }
+    if (_wake_up_by_downstream) {
+        _eos = true;
+        *eos = true;
+        return Status::OK();
+    }
     // The status must be runnable
     if (!_opened && !_fragment_context->is_canceled()) {
         RETURN_IF_ERROR(_open());
@@ -316,6 +332,11 @@ Status PipelineTask::execute(bool* eos) {
         if (_is_blocked()) {
             return Status::OK();
         }
+        if (_wake_up_by_downstream) {
+            _eos = true;
+            *eos = true;
+            return Status::OK();
+        }
 
         /// When a task is cancelled,
         /// its blocking state will be cleared and it will transition to a 
ready state (though it is not truly ready).
@@ -482,9 +503,10 @@ std::string PipelineTask::debug_string() {
     auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
     fmt::format_to(debug_string_buffer,
                    "PipelineTask[this = {}, id = {}, open = {}, eos = {}, 
finish = {}, dry run = "
-                   "{}, elapse time "
-                   "= {}s], block dependency = {}, is running = {}\noperators: 
",
+                   "{}, elapse time = {}s, _wake_up_by_downstream = {}], block 
dependency = {}, is "
+                   "running = {}\noperators: ",
                    (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
elapsed,
+                   _wake_up_by_downstream.load(),
                    cur_blocked_dep && !_finalized ? 
cur_blocked_dep->debug_string() : "NULL",
                    is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dd2ead4b5dc..223420ea55a 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -135,10 +135,11 @@ public:
     int task_id() const { return _index; };
     bool is_finalized() const { return _finalized; }
 
-    void clear_blocking_state() {
+    void clear_blocking_state(bool wake_up_by_downstream = false) {
         
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
+        _wake_up_by_downstream = _wake_up_by_downstream || 
wake_up_by_downstream;
         if (!_finalized) {
             _execution_dep->set_always_ready();
             for (auto* dep : _filter_dependencies) {
@@ -231,6 +232,10 @@ public:
         }
     }
 
+    PipelineId pipeline_id() const { return _pipeline->id(); }
+
+    bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
+
 private:
     friend class RuntimeFilterDependency;
     bool _is_blocked();
@@ -306,11 +311,12 @@ private:
 
     Dependency* _execution_dep = nullptr;
 
-    std::atomic<bool> _finalized {false};
+    std::atomic<bool> _finalized = false;
     std::mutex _dependency_lock;
 
-    std::atomic<bool> _running {false};
-    std::atomic<bool> _eos {false};
+    std::atomic<bool> _running = false;
+    std::atomic<bool> _eos = false;
+    std::atomic<bool> _wake_up_by_downstream = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 8be30773ee1..475d3a8065f 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -94,7 +94,7 @@ void _close_task(PipelineTask* task, Status exec_status) {
     }
     task->finalize();
     task->set_running(false);
-    task->fragment_context()->close_a_pipeline();
+    task->fragment_context()->close_a_pipeline(task->pipeline_id());
 }
 
 void TaskScheduler::_do_work(size_t index) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 50e71d28946..577b7ffa157 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -613,11 +613,15 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 }
 
 Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* 
request) {
-    std::lock_guard<std::mutex> lock(_lock);
-    TUniqueId query_id;
-    query_id.__set_hi(request->query_id().hi());
-    query_id.__set_lo(request->query_id().lo());
-    if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+    std::shared_ptr<QueryContext> q_ctx = nullptr;
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        TUniqueId query_id;
+        query_id.__set_hi(request->query_id().hi());
+        query_id.__set_lo(request->query_id().lo());
+        q_ctx = _get_or_erase_query_ctx(query_id);
+    }
+    if (q_ctx) {
         q_ctx->set_ready_to_execute(Status::OK());
     } else {
         return Status::InternalError(
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index d2b55d86bc6..84dfdd8e1d2 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -130,6 +130,7 @@ Status 
RuntimeFilterMgr::register_local_merge_producer_filter(
             RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
                                                    
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
                                                    build_bf_exactly, true));
+            merge_filter->set_ignored();
             iter->second.filters.emplace_back(merge_filter);
         }
         iter->second.merge_time++;
@@ -151,7 +152,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(
     }
     *local_merge_filters = &iter->second;
     DCHECK(!iter->second.filters.empty());
-    DCHECK_GT(iter->second.merge_time, 0);
     return Status::OK();
 }
 
@@ -236,6 +236,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     auto filter_id = runtime_filter_desc->filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options,
                                                     -1, false));
+    cnt_val->filter->set_ignored();
     _filter_map.emplace(filter_id, cnt_val);
     return Status::OK();
 }
@@ -254,6 +255,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, 
runtime_filter_desc));
     auto filter_id = runtime_filter_desc->filter_id;
     
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, 
query_options));
+    cnt_val->filter->set_ignored();
 
     std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
     _filter_map.emplace(filter_id, cnt_val);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index b04b1cdba06..1ca7325e8cb 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -66,6 +66,7 @@ struct SharedHashTableContext {
     std::map<int, RuntimeFilterContextSPtr> runtime_filters;
     std::atomic<bool> signaled = false;
     bool short_circuit_for_null_in_probe_side = false;
+    std::atomic<bool> complete_build_stage = false;
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
diff --git a/be/test/exprs/runtime_filter_test.cpp 
b/be/test/exprs/runtime_filter_test.cpp
index cfcbaae4a4e..0476104c2e1 100644
--- a/be/test/exprs/runtime_filter_test.cpp
+++ b/be/test/exprs/runtime_filter_test.cpp
@@ -105,11 +105,6 @@ std::shared_ptr<IRuntimeFilter> 
create_runtime_filter(TRuntimeFilterType::type t
 
     EXPECT_TRUE(status.ok()) << status.to_string();
 
-    if (auto bf = runtime_filter->get_bloomfilter()) {
-        status = bf->init_with_fixed_length();
-        EXPECT_TRUE(status.ok()) << status.to_string();
-    }
-
     return status.ok() ? runtime_filter : nullptr;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to