This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 39669c6df2 [feature](pipelineX) add runtimefliter in pipelineX 
multicast sink  (#25120)
39669c6df2 is described below

commit 39669c6df298df57c3924dace5263c39defc29ff
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Tue Oct 10 10:41:08 2023 +0800

    [feature](pipelineX) add runtimefliter in pipelineX multicast sink  (#25120)
---
 .../exec/multi_cast_data_stream_source.cpp         | 10 +++++++++
 .../pipeline/exec/multi_cast_data_stream_source.h  | 26 ++++++++++++++++++----
 2 files changed, 32 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c70d87f59e..97f7d9e573 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -125,8 +125,16 @@ RuntimeProfile* 
MultiCastDataStreamerSourceOperator::get_runtime_profile() const
     return _multi_cast_data_streamer->profile();
 }
 
+MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState*
 state,
+                                                                         
OperatorXBase* parent)
+        : Base(state, parent),
+          vectorized::RuntimeFilterConsumer(
+                  static_cast<Parent*>(parent)->dest_id_from_sink(), 
parent->runtime_filter_descs(),
+                  static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
+
 Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<Parent>();
@@ -134,6 +142,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
         RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
     }
+    // init profile for runtime filter
+    RuntimeFilterConsumer::_init_profile(profile());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 6377b5ef16..943c62d077 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -94,15 +94,21 @@ private:
 
 class MultiCastDataStreamerSourceOperatorX;
 
-class MultiCastDataStreamSourceLocalState final : public 
PipelineXLocalState<MultiCastDependency> {
+class MultiCastDataStreamSourceLocalState final : public 
PipelineXLocalState<MultiCastDependency>,
+                                                  public 
vectorized::RuntimeFilterConsumer {
 public:
     ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
     using Base = PipelineXLocalState<MultiCastDependency>;
     using Parent = MultiCastDataStreamerSourceOperatorX;
-    MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* 
parent)
-            : Base(state, parent) {};
-
+    MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* 
parent);
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+
+    Status open(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::open(state));
+        RETURN_IF_ERROR(_acquire_runtime_filter());
+        return Status::OK();
+    }
+
     friend class MultiCastDataStreamerSourceOperatorX;
 
 private:
@@ -163,6 +169,18 @@ public:
 
     bool is_source() const override { return true; }
 
+    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
+        return _t_data_stream_sink.runtime_filters;
+    }
+
+    int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; }
+
+    bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const 
override {
+        return state->get_local_state(id())
+                ->template cast<MultiCastDataStreamSourceLocalState>()
+                .runtime_filters_are_ready_or_timeout();
+    }
+
 private:
     friend class MultiCastDataStreamSourceLocalState;
     const int _consumer_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to