github-actions[bot] commented on code in PR #24656: URL: https://github.com/apache/doris/pull/24656#discussion_r1333833823
########## be/src/pipeline/exec/union_source_operator.cpp: ########## @@ -103,25 +103,53 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { SCOPED_TIMER(profile()->total_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<Parent>(); - std::shared_ptr<DataQueue> data_queue = std::make_shared<DataQueue>(p._child_size, _dependency); - _shared_state->data_queue.swap(data_queue); + // Const exprs materialized by this node. These exprs don't refer to any children. + // Only materialized by the first fragment instance to avoid duplication. + if (state->per_fragment_instance_idx() == 0) { + auto clone_expr_list = [&](vectorized::VExprContextSPtrs& cur_expr_list, + vectorized::VExprContextSPtrs& other_expr_list) { + cur_expr_list.resize(other_expr_list.size()); + for (int i = 0; i < cur_expr_list.size(); i++) { + RETURN_IF_ERROR(other_expr_list[i]->clone(state, cur_expr_list[i])); + } + return Status::OK(); + }; + _const_expr_lists.resize(p._const_expr_lists.size()); + for (int i = 0; i < _const_expr_lists.size(); i++) { + auto& _const_expr_list = _const_expr_lists[i]; + auto& other_expr_list = p._const_expr_lists[i]; + RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list)); + } + } return Status::OK(); } +std::shared_ptr<DataQueue> UnionSourceLocalState::data_queue() { + auto& p = _parent->cast<Parent>(); + std::shared_ptr<DataQueue> data_queue = std::make_shared<DataQueue>(p._child_size, _dependency); + return data_queue; +} + Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state) { CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state); SCOPED_TIMER(local_state.profile()->total_time_counter()); - std::unique_ptr<vectorized::Block> output_block = vectorized::Block::create_unique(); - int child_idx = 0; - local_state._shared_state->data_queue->get_block_from_queue(&output_block, &child_idx); - if (!output_block) { - return Status::OK(); + if (local_state._need_read_for_const_expr) { + if (has_more_const(state)) { + get_next_const(state, block); + } + local_state._need_read_for_const_expr = has_more_const(state); + } else { + std::unique_ptr<vectorized::Block> output_block = vectorized::Block::create_unique(); Review Comment: warning: variable 'output_block' is not initialized [cppcoreguidelines-init-variables] ```suggestion { = 0 ``` ########## be/src/pipeline/exec/union_source_operator.h: ########## @@ -98,16 +104,50 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> { bool is_source() const override { return true; } + Status init(const TPlanNode& tnode, RuntimeState* state) override { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + // Create const_expr_ctx_lists_ from thrift exprs. + auto& const_texpr_lists = tnode.union_node.const_expr_lists; + for (auto& texprs : const_texpr_lists) { + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _const_expr_lists.push_back(ctxs); + } + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + Base::prepare(state); + // Prepare const expr lists. + for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state, _row_descriptor)); + } + return Status::OK(); + } + Status open(RuntimeState* state) override { + Base::open(state); Review Comment: warning: variable 'state' is not initialized [cppcoreguidelines-init-variables] ```suggestion Base::open(state = 0); ``` ########## be/src/pipeline/exec/multi_cast_data_stream_source.h: ########## @@ -83,11 +86,156 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase, private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; - TDataStreamSink _t_data_stream_sink; + const TDataStreamSink _t_data_stream_sink; vectorized::VExprContextSPtrs _output_expr_contexts; vectorized::VExprContextSPtrs _conjuncts; }; +class MultiCastDataStreamerSourceOperatorX; + +class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> { +public: + ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); + using Base = PipelineXLocalState<MultiCastDependency>; + using Parent = MultiCastDataStreamerSourceOperatorX; + MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) + : Base(state, parent) {}; + + Status init(RuntimeState* state, LocalStateInfo& info) override; + friend class MultiCastDataStreamerSourceOperatorX; + +private: + vectorized::VExprContextSPtrs _output_expr_contexts; +}; +class MultiCastDataStreamerSourceOperatorX final + : public OperatorX<MultiCastDataStreamSourceLocalState> { +public: + using Base = OperatorX<MultiCastDataStreamSourceLocalState>; + MultiCastDataStreamerSourceOperatorX(const int consumer_id, ObjectPool* pool, + const TDataStreamSink& sink, + const RowDescriptor& row_descriptor, int id) + : Base(pool, id), + _consumer_id(consumer_id), + _t_data_stream_sink(sink), + _row_descriptor(row_descriptor) {}; + ~MultiCastDataStreamerSourceOperatorX() override = default; + Dependency* wait_for_dependency(RuntimeState* state) override { + CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); + return local_state._dependency->can_read(_consumer_id); + } + + Status prepare(RuntimeState* state) override { + RETURN_IF_ERROR(Base::prepare(state)); + // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); + // init profile for runtime filter + // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs, + _output_expr_contexts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, _row_desc())); + } + + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, + _conjuncts)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc())); + } + return Status::OK(); + } + + Status open(RuntimeState* state) override { + RETURN_IF_ERROR(Base::open(state)); + if (_t_data_stream_sink.__isset.output_exprs) { + RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state)); + } + if (_t_data_stream_sink.__isset.conjuncts) { + RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state)); + } + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state) override; + + bool is_source() const override { return true; } Review Comment: warning: function 'is_source' should be marked [[nodiscard]] [modernize-use-nodiscard] ```suggestion [[nodiscard]] bool is_source() const override { return true; } ``` ########## be/src/pipeline/pipeline_x/dependency.h: ########## @@ -17,9 +17,12 @@ #pragma once +#include <sqltypes.h> Review Comment: warning: 'sqltypes.h' file not found [clang-diagnostic-error] ```cpp #include <sqltypes.h> ^ ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.cpp: ########## @@ -62,20 +63,32 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - LocalSinkStateInfo sink_info {_parent_profile, local_params.sender_id, - get_downstream_dependency().get()}; - RETURN_IF_ERROR(_sink->setup_local_state(state, sink_info)); + { + // set sink local state + auto& deps = get_downstream_dependency(); + std::vector<LocalSinkStateInfo> infos; Review Comment: warning: variable 'infos' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::vector<LocalSinkStateInfo> infos = 0; ``` ########## be/src/pipeline/exec/union_source_operator.h: ########## @@ -98,16 +104,50 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> { bool is_source() const override { return true; } + Status init(const TPlanNode& tnode, RuntimeState* state) override { + RETURN_IF_ERROR(Base::init(tnode, state)); + DCHECK(tnode.__isset.union_node); + // Create const_expr_ctx_lists_ from thrift exprs. + auto& const_texpr_lists = tnode.union_node.const_expr_lists; + for (auto& texprs : const_texpr_lists) { + vectorized::VExprContextSPtrs ctxs; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(texprs, ctxs)); + _const_expr_lists.push_back(ctxs); + } + return Status::OK(); + } + + Status prepare(RuntimeState* state) override { + Base::prepare(state); Review Comment: warning: variable 'state' is not initialized [cppcoreguidelines-init-variables] ```suggestion Base::prepare(state = 0); ``` ########## be/src/pipeline/pipeline_x/pipeline_x_task.cpp: ########## @@ -62,20 +63,32 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - LocalSinkStateInfo sink_info {_parent_profile, local_params.sender_id, - get_downstream_dependency().get()}; - RETURN_IF_ERROR(_sink->setup_local_state(state, sink_info)); + { + // set sink local state + auto& deps = get_downstream_dependency(); + std::vector<LocalSinkStateInfo> infos; + for (auto& dep : deps) { + infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(), + local_params.sender_id, dep.get()}); + } + RETURN_IF_ERROR(_sink->setup_local_states(state, infos)); + } std::vector<TScanRangeParams> no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->id(), no_scan_ranges); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { - LocalStateInfo info { - op_idx == _operators.size() - 1 - ? _parent_profile - : state->get_local_state(_operators[op_idx + 1]->id())->profile(), - scan_ranges, get_upstream_dependency(_operators[op_idx]->id())}; - RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info)); + auto& deps = get_upstream_dependency(_operators[op_idx]->id()); + std::vector<LocalStateInfo> infos; Review Comment: warning: variable 'infos' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::vector<LocalStateInfo> infos = 0; ``` ########## be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp: ########## @@ -253,6 +255,56 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData _sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink)); break; } + case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { + DCHECK(thrift_sink.__isset.multi_cast_stream_sink); + DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); + // TODO: figure out good buffer size based on size of output row + /// TODO: Here is a magic number, and we will refactor this part later. + static int sink_count = 120000; + auto sink_id = sink_count++; + auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); + // one sink has multiple sources. + std::vector<int> sources; Review Comment: warning: variable 'sources' is not initialized [cppcoreguidelines-init-variables] ```suggestion std::vector<int> sources = 0; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org