This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch runtimefilter_multi_send in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6fc0667e8e9f2e38adf358c3ceac89b9db8d789d Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Jun 28 21:09:43 2023 +0800 [feature](runtime_filter) MultiCastSender supporting runtime filter on be (#21304) --- .../exec/multi_cast_data_stream_source.cpp | 61 +++++++++++++++++++--- .../pipeline/exec/multi_cast_data_stream_source.h | 23 ++++++-- be/src/pipeline/pipeline_fragment_context.cpp | 3 +- ...nsumer_node.cpp => runtime_filter_consumer.cpp} | 44 +++++++++------- ...r_consumer_node.h => runtime_filter_consumer.h} | 28 ++++++---- be/src/vec/exec/scan/vscan_node.cpp | 4 +- be/src/vec/exec/scan/vscan_node.h | 10 ++-- be/src/vec/exec/vselect_node.cpp | 20 ++----- be/src/vec/exec/vselect_node.h | 8 +-- gensrc/thrift/DataSinks.thrift | 12 +++++ 10 files changed, 146 insertions(+), 67 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 06211faf52..18ca62ea15 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -27,14 +27,16 @@ namespace doris::pipeline { MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder( - int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer) + int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer, + const TDataStreamSink& sink) : OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"), _consumer_id(consumer_id), - _multi_cast_data_streamer(data_streamer) {}; + _multi_cast_data_streamer(data_streamer), + _t_data_stream_sink(sink) {} OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() { - return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id, - _multi_cast_data_streamer); + return std::make_shared<MultiCastDataStreamerSourceOperator>( + this, _consumer_id, _multi_cast_data_streamer, _t_data_stream_sink); } const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() { @@ -43,10 +45,44 @@ const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() { MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator( OperatorBuilderBase* operator_builder, const int consumer_id, - std::shared_ptr<MultiCastDataStreamer>& data_streamer) + std::shared_ptr<MultiCastDataStreamer>& data_streamer, const TDataStreamSink& sink) : OperatorBase(operator_builder), + vectorized::RuntimeFilterConsumer(sink.dest_node_id, sink.runtime_filters, + data_streamer->row_desc(), _conjuncts), _consumer_id(consumer_id), - _multi_cast_data_streamer(data_streamer) {}; + _multi_cast_data_streamer(data_streamer), + _t_data_stream_sink(sink) {} + +Status MultiCastDataStreamerSourceOperator::init(const TDataSink& tsink) { + RETURN_IF_ERROR(OperatorBase::init(tsink)); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, _conjuncts)); + } + + return Status::OK(); +} + +Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) { + RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + _register_runtime_filter(); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, row_desc())); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, row_desc())); + return Status::OK(); +} + +Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) { + return _acquire_runtime_filter(state); +} + +bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() { + return vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout(); +} bool MultiCastDataStreamerSourceOperator::can_read() { return _multi_cast_data_streamer->can_read(_consumer_id); @@ -56,6 +92,19 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto SourceState& source_state) { bool eos = false; _multi_cast_data_streamer->pull(_consumer_id, block, &eos); + if (!_output_expr_contexts.empty()) { + vectorized::Block output_block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _output_expr_contexts, *block, &output_block)); + materialize_block_inplace(output_block); + block->swap(output_block); + } + + if (!_conjuncts.empty()) { + RETURN_IF_ERROR( + vectorized::VExprContext::filter_block(_conjuncts, block, block->columns())); + } + if (eos) { source_state = SourceState::FINISHED; } diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 15bd320b89..b2c7ca9e55 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -22,6 +22,7 @@ #include "common/status.h" #include "operator.h" +#include "vec/exec/runtime_filter_consumer.h" namespace doris { class ExecNode; @@ -37,7 +38,8 @@ class MultiCastDataStreamer; class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase { public: MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id, - std::shared_ptr<MultiCastDataStreamer>&); + std::shared_ptr<MultiCastDataStreamer>&, + const TDataStreamSink&); bool is_source() const override { return true; } @@ -48,20 +50,27 @@ public: private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; + TDataStreamSink _t_data_stream_sink; }; -class MultiCastDataStreamerSourceOperator final : public OperatorBase { +class MultiCastDataStreamerSourceOperator final : public OperatorBase, + public vectorized::RuntimeFilterConsumer { public: MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder, const int consumer_id, - std::shared_ptr<MultiCastDataStreamer>& data_streamer); + std::shared_ptr<MultiCastDataStreamer>& data_streamer, + const TDataStreamSink& sink); Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) override; - Status prepare(RuntimeState* state) override { return Status::OK(); }; + Status init(const TDataSink& tsink) override; - Status open(RuntimeState* state) override { return Status::OK(); }; + Status prepare(RuntimeState* state) override; + + Status open(RuntimeState* state) override; + + bool runtime_filters_are_ready_or_timeout() override; Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override { return Status::OK(); @@ -76,6 +85,10 @@ public: private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; + TDataStreamSink _t_data_stream_sink; + + vectorized::VExprContextSPtrs _output_expr_contexts; + vectorized::VExprContextSPtrs _conjuncts; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7c24be8ded..a97128ee32 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -776,7 +776,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr // 2. create and set the source operator of multi_cast_data_stream_source for new pipeline OperatorBuilderPtr source_op = std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>( - next_operator_builder_id(), i, multi_cast_data_streamer); + next_operator_builder_id(), i, multi_cast_data_streamer, + thrift_sink.multi_cast_stream_sink.sinks[i]); new_pipeline->add_operator(source_op); // 3. create and set sink operator of data stream sender for new pipeline diff --git a/be/src/vec/exec/runtime_filter_consumer_node.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp similarity index 77% rename from be/src/vec/exec/runtime_filter_consumer_node.cpp rename to be/src/vec/exec/runtime_filter_consumer.cpp index dd631ce66e..ed74d7ecd0 100644 --- a/be/src/vec/exec/runtime_filter_consumer_node.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -15,22 +15,26 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/runtime_filter_consumer_node.h" +#include "vec/exec/runtime_filter_consumer.h" namespace doris::vectorized { -RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} +RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, + const std::vector<TRuntimeFilterDesc>& runtime_filters, + const RowDescriptor& row_descriptor, + VExprContextSPtrs& conjuncts) + : _filter_id(filter_id), + _runtime_filter_descs(runtime_filters), + _row_descriptor_ref(row_descriptor), + _conjuncts_ref(conjuncts) {} -Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::init(tnode, state)); +Status RuntimeFilterConsumer::init(RuntimeState* state) { _state = state; RETURN_IF_ERROR(_register_runtime_filter()); return Status::OK(); } -Status RuntimeFilterConsumerNode::_register_runtime_filter() { +Status RuntimeFilterConsumer::_register_runtime_filter() { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); @@ -43,14 +47,14 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() { // 1. All BE and FE has been upgraded (e.g. opt_remote_rf) // 2. This filter is bloom filter (only bloom filter should be used for merging) RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), id(), false)); + filter_desc, _state->query_options(), _filter_id, false)); RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, id(), &runtime_filter)); + filter_desc.filter_id, _filter_id, &runtime_filter)); } else { RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), id(), false)); + filter_desc, _state->query_options(), _filter_id, false)); RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter( - filter_desc.filter_id, id(), &runtime_filter)); + filter_desc.filter_id, _filter_id, &runtime_filter)); } _runtime_filter_ctxs.emplace_back(runtime_filter); _runtime_filter_ready_flag.emplace_back(false); @@ -58,7 +62,7 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() { return Status::OK(); } -bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() { +bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() { if (!_blocked_by_rf) { return true; } @@ -72,7 +76,7 @@ bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() { return true; } -Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) { +Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) { SCOPED_TIMER(_acquire_runtime_filter_timer); VExprSPtrs vexprs; for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { @@ -101,23 +105,23 @@ Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) { return Status::OK(); } -Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { +Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) { if (vexprs.empty()) { return Status::OK(); } for (auto& expr : vexprs) { VExprContextSPtr conjunct = VExprContext::create_shared(expr); - RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor)); + RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref)); RETURN_IF_ERROR(conjunct->open(_state)); _rf_vexpr_set.insert(expr); - _conjuncts.emplace_back(conjunct); + _conjuncts_ref.emplace_back(conjunct); } return Status::OK(); } -Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { +Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { if (_is_all_rf_applied) { *arrived_rf_num = _runtime_filter_descs.size(); return Status::OK(); @@ -140,12 +144,12 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs( - &exprs, _row_descriptor, _state)); + &exprs, _row_descriptor_ref, _state)); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } } - // 2. Append unapplied runtime filters to vconjunct_ctx_ptr + // 2. Append unapplied runtime filters to _conjuncts if (!exprs.empty()) { RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs)); } @@ -157,7 +161,7 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar return Status::OK(); } -void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) { +void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) { _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime"); } diff --git a/be/src/vec/exec/runtime_filter_consumer_node.h b/be/src/vec/exec/runtime_filter_consumer.h similarity index 81% rename from be/src/vec/exec/runtime_filter_consumer_node.h rename to be/src/vec/exec/runtime_filter_consumer.h index 518e0e865c..18e92abc90 100644 --- a/be/src/vec/exec/runtime_filter_consumer_node.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -22,14 +22,16 @@ namespace doris::vectorized { -class RuntimeFilterConsumerNode : public ExecNode { +class RuntimeFilterConsumer { public: - RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~RuntimeFilterConsumerNode() override = default; + RuntimeFilterConsumer(const int32_t filter_id, + const std::vector<TRuntimeFilterDesc>& runtime_filters, + const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); + ~RuntimeFilterConsumer() = default; - Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status init(RuntimeState* state); - // Try append late arrived runtime filters. + // Try to append late arrived runtime filters. // Return num of filters which are applied already. Status try_append_late_arrival_runtime_filter(int* arrived_rf_num); @@ -54,15 +56,23 @@ protected: IRuntimeFilter* runtime_filter; }; - RuntimeState* _state; - std::vector<RuntimeFilterContext> _runtime_filter_ctxs; - - std::vector<TRuntimeFilterDesc> _runtime_filter_descs; // Set to true if the runtime filter is ready. std::vector<bool> _runtime_filter_ready_flag; doris::Mutex _rf_locks; phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set; + +private: + RuntimeState* _state; + + int32_t _filter_id; + + std::vector<TRuntimeFilterDesc> _runtime_filter_descs; + + const RowDescriptor& _row_descriptor_ref; + + VExprContextSPtrs& _conjuncts_ref; + // True means all runtime filters are applied to scanners bool _is_all_rf_applied = true; bool _blocked_by_rf = false; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 2af6fc87c5..e85775ffa1 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { } Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state)); + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); + _state = state; _is_pipeline_scan = state->enable_pipeline_exec(); const TQueryOptions& query_options = state->query_options(); diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 112ca47b54..52284ffe95 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -43,7 +43,7 @@ #include "runtime/runtime_state.h" #include "util/lock.h" #include "util/runtime_profile.h" -#include "vec/exec/runtime_filter_consumer_node.h" +#include "vec/exec/runtime_filter_consumer.h" #include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscanner.h" #include "vec/runtime/shared_scanner_controller.h" @@ -88,10 +88,12 @@ struct FilterPredicates { std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters; }; -class VScanNode : public RuntimeFilterConsumerNode { +class VScanNode : public ExecNode, public RuntimeFilterConsumer { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : RuntimeFilterConsumerNode(pool, tnode, descs) { + : ExecNode(pool, tnode, descs), + RuntimeFilterConsumer(id(), tnode.runtime_filters, ExecNode::_row_descriptor, + ExecNode::_conjuncts) { if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { // Which means the request could be fullfilled in a single segment iterator request. if (tnode.limit > 0 && tnode.limit < 1024) { @@ -304,6 +306,8 @@ protected: VExprContextSPtrs _stale_expr_ctxs; VExprContextSPtrs _common_expr_ctxs_push_down; + RuntimeState* _state; + // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index 626fd5ce96..ee1628cd19 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -37,34 +37,22 @@ class TPlanNode; namespace vectorized { VSelectNode::VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : RuntimeFilterConsumerNode(pool, tnode, descs), _child_eos(false) {} + : ExecNode(pool, tnode, descs), _child_eos(false) {} Status VSelectNode::init(const TPlanNode& tnode, RuntimeState* state) { - return RuntimeFilterConsumerNode::init(tnode, state); + return ExecNode::init(tnode, state); } Status VSelectNode::prepare(RuntimeState* state) { - return RuntimeFilterConsumerNode::prepare(state); + return ExecNode::prepare(state); } Status VSelectNode::open(RuntimeState* state) { - RETURN_IF_ERROR(RuntimeFilterConsumerNode::open(state)); + RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_ERROR(child(0)->open(state)); return Status::OK(); } -Status VSelectNode::alloc_resource(RuntimeState* state) { - if (_opened) { - return Status::OK(); - } - - RETURN_IF_ERROR(RuntimeFilterConsumerNode::alloc_resource(state)); - RETURN_IF_ERROR(_acquire_runtime_filter()); - RETURN_IF_CANCELLED(state); - _opened = true; - return Status::OK(); -} - Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vselect_node.h b/be/src/vec/exec/vselect_node.h index 140009e4b3..1b425e80af 100644 --- a/be/src/vec/exec/vselect_node.h +++ b/be/src/vec/exec/vselect_node.h @@ -17,7 +17,7 @@ #pragma once #include "common/status.h" -#include "vec/exec/runtime_filter_consumer_node.h" +#include "exec/exec_node.h" namespace doris { class DescriptorTbl; @@ -28,7 +28,7 @@ class TPlanNode; namespace vectorized { class Block; -class VSelectNode final : public RuntimeFilterConsumerNode { +class VSelectNode final : public ExecNode { public: VSelectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; @@ -37,13 +37,9 @@ public: Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; Status close(RuntimeState* state) override; Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - - Status alloc_resource(RuntimeState* state) override; - private: // true if last get_next() call on child signalled eos bool _child_eos; - bool _opened = false; }; } // namespace vectorized } // namespace doris \ No newline at end of file diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift index af5d4d26a3..c78a7900a9 100644 --- a/gensrc/thrift/DataSinks.thrift +++ b/gensrc/thrift/DataSinks.thrift @@ -153,6 +153,18 @@ struct TDataStreamSink { 2: required Partitions.TDataPartition output_partition 3: optional bool ignore_not_found + + // per-destination projections + 4: optional list<Exprs.TExpr> output_exprs + + // project output tuple id + 5: optional Types.TTupleId output_tuple_id + + // per-destination filters + 6: optional list<Exprs.TExpr> conjuncts + + // per-destination runtime filters + 7: optional list<PlanNodes.TRuntimeFilterDesc> runtime_filters } struct TMultiCastDataStreamSink { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org