This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 39669c6df2 [feature](pipelineX) add runtimefliter in pipelineX multicast sink (#25120) 39669c6df2 is described below commit 39669c6df298df57c3924dace5263c39defc29ff Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Tue Oct 10 10:41:08 2023 +0800 [feature](pipelineX) add runtimefliter in pipelineX multicast sink (#25120) --- .../exec/multi_cast_data_stream_source.cpp | 10 +++++++++ .../pipeline/exec/multi_cast_data_stream_source.h | 26 ++++++++++++++++++---- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c70d87f59e..97f7d9e573 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -125,8 +125,16 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const return _multi_cast_data_streamer->profile(); } +MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, + OperatorXBase* parent) + : Base(state, parent), + vectorized::RuntimeFilterConsumer( + static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), + static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {}; + Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<Parent>(); @@ -134,6 +142,8 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); } + // init profile for runtime filter + RuntimeFilterConsumer::_init_profile(profile()); return Status::OK(); } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 6377b5ef16..943c62d077 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -94,15 +94,21 @@ private: class MultiCastDataStreamerSourceOperatorX; -class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency>, + public vectorized::RuntimeFilterConsumer { public: ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); using Base = PipelineXLocalState<MultiCastDependency>; using Parent = MultiCastDataStreamerSourceOperatorX; - MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent) {}; - + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + RETURN_IF_ERROR(_acquire_runtime_filter()); + return Status::OK(); + } + friend class MultiCastDataStreamerSourceOperatorX; private: @@ -163,6 +169,18 @@ public: bool is_source() const override { return true; } + const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { + return _t_data_stream_sink.runtime_filters; + } + + int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; } + + bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const override { + return state->get_local_state(id()) + ->template cast<MultiCastDataStreamSourceLocalState>() + .runtime_filters_are_ready_or_timeout(); + } + private: friend class MultiCastDataStreamSourceLocalState; const int _consumer_id; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org