github-actions[bot] commented on code in PR #42341: URL: https://github.com/apache/doris/pull/42341#discussion_r1812483465
########## be/src/pipeline/exec/exchange_sink_buffer.h: ########## @@ -22,9 +22,9 @@ #include <gen_cpp/internal_service.pb.h> #include <gen_cpp/types.pb.h> #include <parallel_hashmap/phmap.h> +#include <stdint.h> Review Comment: warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers] ```suggestion #include <cstdint> ``` ########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -107,28 +108,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _wait_channel_timer.push_back(_profile->add_nonzero_counter( fmt::format("WaitForLocalExchangeBuffer{}", i), TUnit ::TIME_NS, timer_name, 1)); } + + _sink_buffer = p._sink_buffer; + _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + _sink_buffer->inc_running_sink(this); _wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); return Status::OK(); } -void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { - std::lock_guard<std::mutex> lock(_finished_channels_mutex); - - if (_finished_channels.contains(channel_id)) { - LOG(WARNING) << "query: " << print_id(_state->query_id()) - << ", on_channel_finished on already finished channel: " << channel_id; - return; - } else { - _finished_channels.emplace(channel_id); - if (_working_channels_count.fetch_sub(1) == 1) { - set_reach_limit(); - if (_finish_dependency) { - _finish_dependency->set_ready(); - } - } - } -} - Status ExchangeSinkLocalState::open(RuntimeState* state) { Review Comment: warning: function 'open' has cognitive complexity of 64 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status ExchangeSinkLocalState::open(RuntimeState* state) { ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/exchange_sink_operator.cpp:120:** nesting level increased to 1 ```cpp SCOPED_TIMER(_open_timer); ^ ``` **be/src/util/runtime_profile.h:67:** expanded from macro 'SCOPED_TIMER' ```cpp #define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c) ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp RETURN_IF_ERROR(Base::open(state)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:121:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(Base::open(state)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:124:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM || ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:144:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!only_local_exchange) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:147:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:151:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:157:** +1, nesting level increased to 1 ```cpp } else if (local_size > 0) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:172:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_part_type == TPartitionType::HASH_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:176:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:177:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:180:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:184:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:185:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:188:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:194:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_schema->init(p._tablet_sink_schema)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_vpartition->init()); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:196:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_vpartition->init()); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:226:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:249:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->init(p._texprs)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:250:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:255:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_part_type == TPartitionType::HASH_PARTITIONED || ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_partitioner->open(state)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:258:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_partitioner->open(state)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:259:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:260:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_row_distribution.open(_tablet_sink_row_desc)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` </details> ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -119,106 +131,98 @@ Status init(RuntimeState* state); Status open(RuntimeState* state); - Status send_local_block(Block* block, bool eos, bool can_be_moved); - // Flush buffered rows and close channel. This function don't wait the response - // of close operation, client should call close_wait() to finish channel's close. - // We split one close operation into two phases in order to make multiple channels - // can run parallel. - Status close(RuntimeState* state); + Status send_local_block(Status exec_status, bool eos = false); + + Status send_local_block(Block* block, bool can_be_moved); + + // Get close wait's response, to finish channel close operation. + Status close_wait(RuntimeState* state); + + int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; } + + PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; } std::string get_fragment_instance_id_str() { - UniqueId uid(_fragment_instance_id); + UniqueId uid(_dest_fragment_instance_id); return uid.to_string(); } bool is_local() const { return _is_local; } + virtual void ch_roll_pb_block(); + bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } - int64_t mem_usage() const; - - // Asynchronously sends a block - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); - Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { - if (_fragment_instance_id.lo == -1) { - return Status::OK(); - } - - bool serialized = false; - if (_pblock == nullptr) { - _pblock = std::make_unique<PBlock>(); - } - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, - &rows)); - if (serialized) { - RETURN_IF_ERROR(_send_current_block(eos)); - } - - return Status::OK(); - } - - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } - - std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback( - InstanceLoId id, bool eos) { - if (!_send_callback) { - _send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared(); - } else { - _send_callback->cntl_->Reset(); +protected: + bool _recvr_is_valid() { + if (_local_recvr && !_local_recvr->is_closed()) { + return true; } - _send_callback->init(id, eos); - return _send_callback; + _receiver_status = Status::EndOfFile( + "local data stream receiver closed"); // local data stream receiver closed + return false; } - std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(); - -protected: - Status _send_local_block(bool eos); - Status _send_current_block(bool eos); - - Status _recvr_status() const { - if (_local_recvr && !_local_recvr->is_closed()) { + Status _wait_last_brpc() { + SCOPED_TIMER(_parent->brpc_wait_timer()); + if (_send_remote_block_callback == nullptr) { return Status::OK(); } - return Status::EndOfFile( - "local data stream receiver closed"); // local data stream receiver closed + _send_remote_block_callback->join(); + if (_send_remote_block_callback->cntl_->Failed()) { + std::string err = fmt::format( + "failed to send brpc batch, error={}, error_text={}, client: {}, " + "latency = {}", + berror(_send_remote_block_callback->cntl_->ErrorCode()), + _send_remote_block_callback->cntl_->ErrorText(), + BackendOptions::get_localhost(), + _send_remote_block_callback->cntl_->latency_us()); + LOG(WARNING) << err; + return Status::RpcError(err); + } + _receiver_status = Status::create(_send_remote_block_callback->response_->status()); + return _receiver_status; } - pipeline::ExchangeSinkLocalState* _parent = nullptr; + Parent* _parent = nullptr; - const TUniqueId _fragment_instance_id; + const RowDescriptor& _row_desc; + const TUniqueId _dest_fragment_instance_id; PlanNodeId _dest_node_id; - bool _closed {false}; - bool _need_close {false}; + + // the number of RowBatch.data bytes sent successfully + int64_t _num_data_bytes_sent {}; + int64_t _packet_seq {}; + + bool _need_close; Review Comment: warning: use default member initializer for '_need_close' [modernize-use-default-member-init] be/src/vec/sink/vdata_stream_sender.h:114: ```diff - _need_close(false), + , ``` ```suggestion bool _need_close{false}; ``` ########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -383,7 +365,7 @@ Status st) { channel->set_receiver_eof(st); // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. - static_cast<void>(channel->close(state)); + static_cast<void>(channel->close(state, Status::OK())); } Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { Review Comment: warning: function 'sink' has cognitive complexity of 232 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/exchange_sink_operator.cpp:382:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (all_receiver_eof) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:390:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:394:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (local_state.only_local_exchange) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:395:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!block->empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:410:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._serializer.next_serialized_block( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:414:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(local_state._serializer.next_serialized_block( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:417:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (serialized) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:419:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!cur_block.empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:420:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:423:** +1, nesting level increased to 4 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:448:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (moved) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:450:** +1, nesting level increased to 4 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:458:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::RANDOM) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:462:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!current_channel->is_receiver_eof()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:464:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (current_channel->is_local()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp if (status.is<ErrorCode::END_OF_FILE>()) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +1, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp } else { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:466:** +7, including nesting penalty of 6, nesting level increased to 7 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:467:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:468:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp if (status.is<ErrorCode::END_OF_FILE>()) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +1, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp } else { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:472:** +7, including nesting penalty of 6, nesting level increased to 7 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:478:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::HASH_PARTITIONED || ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:483:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(channel_add_rows( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:490:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(channel_add_rows( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:502:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(local_state._send_new_partition_batch()); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:508:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._send_new_partition_batch()); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:515:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (input_rows > 0) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:520:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(local_state._row_distribution.generate_rows_distribution( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:526:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp for (int idx = 0; idx < row_ids.size(); ++idx) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:534:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (eos) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._send_new_partition_batch()); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:536:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(local_state._send_new_partition_batch()); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:540:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(channel_add_rows_with_idx(state, local_state.channels, num_channels, ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:550:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:557:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, block)); ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(channel_add_rows_with_idx( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:561:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(channel_add_rows_with_idx( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:572:** +1, nesting level increased to 1 ```cpp } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:577:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!current_channel->is_receiver_eof()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:579:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (current_channel->is_local()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp if (status.is<ErrorCode::END_OF_FILE>()) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +1, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp } else { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:581:** +7, including nesting penalty of 6, nesting level increased to 7 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:582:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:583:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(local_state._serializer.serialize_block( ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:228:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:229:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp if (status.is<ErrorCode::END_OF_FILE>()) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +1, nesting level increased to 5 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:231:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp } else { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +6, including nesting penalty of 5, nesting level increased to 6 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:587:** +7, including nesting penalty of 6, nesting level increased to 7 ```cpp HANDLE_CHANNEL_STATUS(state, current_channel, status); ^ ``` **be/src/vec/sink/vdata_stream_sender.h:232:** expanded from macro 'HANDLE_CHANNEL_STATUS' ```cpp RETURN_IF_ERROR(status); \ ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:593:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_writer_count < local_state.channels.size()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:594:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (_data_processed >= ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:601:** +1, nesting level increased to 1 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:608:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (eos) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:610:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp for (int i = 0; i < local_state.channels.size(); ++i) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (!st.ok() && final_st.ok()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:612:** +1 ```cpp if (!st.ok() && final_st.ok()) { ^ ``` **be/src/pipeline/exec/exchange_sink_operator.cpp:616:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (local_state._sink_buffer) { ^ ``` </details> ########## be/src/pipeline/exec/exchange_sink_operator.h: ########## @@ -103,30 +106,30 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } - + [[nodiscard]] int be_number() const { return _state->be_number(); } std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } Status _send_new_partition_batch(); - std::vector<std::shared_ptr<vectorized::Channel>> channels; - int current_channel_idx {0}; // index of current channel to send to if _random == true - bool only_local_exchange {false}; - - void on_channel_finished(InstanceLoId channel_id); + std::vector<vectorized::PipChannel*> channels; + std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs; + int current_channel_idx; // index of current channel to send to if _random == true Review Comment: warning: use default member initializer for 'current_channel_idx' [modernize-use-default-member-init] be/src/pipeline/exec/exchange_sink_operator.h:56: ```diff - current_channel_idx(0), + , ``` ```suggestion int current_channel_idx{0}; // index of current channel to send to if _random == true ``` ########## be/src/pipeline/exec/exchange_sink_operator.h: ########## @@ -103,30 +106,30 @@ void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } - + [[nodiscard]] int be_number() const { return _state->be_number(); } std::string name_suffix() override; segment_v2::CompressionTypePB compression_type() const; std::string debug_string(int indentation_level) const override; static Status empty_callback_function(void* sender, TCreatePartitionResult* result) { return Status::OK(); } Status _send_new_partition_batch(); - std::vector<std::shared_ptr<vectorized::Channel>> channels; - int current_channel_idx {0}; // index of current channel to send to if _random == true - bool only_local_exchange {false}; - - void on_channel_finished(InstanceLoId channel_id); + std::vector<vectorized::PipChannel*> channels; + std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs; + int current_channel_idx; // index of current channel to send to if _random == true + bool only_local_exchange; Review Comment: warning: use default member initializer for 'only_local_exchange' [modernize-use-default-member-init] be/src/pipeline/exec/exchange_sink_operator.h:57: ```diff - only_local_exchange(false), + , ``` ```suggestion bool only_local_exchange{false}; ``` ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -209,30 +222,40 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { Review Comment: warning: function '_send_rpc' has cognitive complexity of 76 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/exchange_sink_buffer.cpp:228:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (_is_finishing) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:235:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp while (!q.empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (request.block && !request.block->column_metas().empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:250:** +1 ```cpp if (request.block && !request.block->column_metas().empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:253:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!request.exec_status.ok()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:260:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::exchange_sink_ignore_eovercrowded) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:263:** nesting level increased to 2 ```cpp send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:266:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (task_lock == nullptr) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:274:** nesting level increased to 2 ```cpp send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:279:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (task_lock == nullptr) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:287:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (s.is<ErrorCode::END_OF_FILE>()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:289:** +1, nesting level increased to 3 ```cpp } else if (!s.ok()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:292:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:294:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!s) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:305:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (enable_http_send_block(*brpc_request)) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:306:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:309:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:314:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (request.block) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:319:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_total_queue_size <= _queue_capacity) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:324:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_keep_order) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:329:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp while (!broadcast_q.empty()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (request.block_holder->get_block() && ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:344:** +1 ```cpp if (request.block_holder->get_block() && ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:352:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (config::exchange_sink_ignore_eovercrowded) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:355:** nesting level increased to 2 ```cpp send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:358:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (task_lock == nullptr) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:366:** nesting level increased to 2 ```cpp send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:371:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (task_lock == nullptr) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:379:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (s.is<ErrorCode::END_OF_FILE>()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:381:** +1, nesting level increased to 3 ```cpp } else if (!s.ok()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:384:** +1, nesting level increased to 3 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:386:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!s) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:397:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (enable_http_send_block(*brpc_request)) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), ^ ``` **be/src/common/status.h:629:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:398:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), ^ ``` **be/src/common/status.h:631:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:401:** +1, nesting level increased to 2 ```cpp } else { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:406:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (request.block_holder->get_block()) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:410:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (_keep_order) { ^ ``` **be/src/pipeline/exec/exchange_sink_buffer.cpp:414:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (is_empty) { ^ ``` </details> ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -119,106 +131,98 @@ class Channel { Status init(RuntimeState* state); Status open(RuntimeState* state); - Status send_local_block(Block* block, bool eos, bool can_be_moved); - // Flush buffered rows and close channel. This function don't wait the response - // of close operation, client should call close_wait() to finish channel's close. - // We split one close operation into two phases in order to make multiple channels - // can run parallel. - Status close(RuntimeState* state); + Status send_local_block(Status exec_status, bool eos = false); + + Status send_local_block(Block* block, bool can_be_moved); + + // Get close wait's response, to finish channel close operation. + Status close_wait(RuntimeState* state); + + int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; } + + PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; } std::string get_fragment_instance_id_str() { - UniqueId uid(_fragment_instance_id); + UniqueId uid(_dest_fragment_instance_id); return uid.to_string(); } bool is_local() const { return _is_local; } + virtual void ch_roll_pb_block(); + bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } - int64_t mem_usage() const; - - // Asynchronously sends a block - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); - Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { - if (_fragment_instance_id.lo == -1) { - return Status::OK(); - } - - bool serialized = false; - if (_pblock == nullptr) { - _pblock = std::make_unique<PBlock>(); - } - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, - &rows)); - if (serialized) { - RETURN_IF_ERROR(_send_current_block(eos)); - } - - return Status::OK(); - } - - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } - - std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback( - InstanceLoId id, bool eos) { - if (!_send_callback) { - _send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared(); - } else { - _send_callback->cntl_->Reset(); +protected: + bool _recvr_is_valid() { + if (_local_recvr && !_local_recvr->is_closed()) { + return true; Review Comment: warning: redundant boolean literal in conditional return statement [readability-simplify-boolean-expr] ```cpp return true; ^ ``` ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -119,106 +131,98 @@ Status init(RuntimeState* state); Status open(RuntimeState* state); - Status send_local_block(Block* block, bool eos, bool can_be_moved); - // Flush buffered rows and close channel. This function don't wait the response - // of close operation, client should call close_wait() to finish channel's close. - // We split one close operation into two phases in order to make multiple channels - // can run parallel. - Status close(RuntimeState* state); + Status send_local_block(Status exec_status, bool eos = false); + + Status send_local_block(Block* block, bool can_be_moved); + + // Get close wait's response, to finish channel close operation. + Status close_wait(RuntimeState* state); + + int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; } + + PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; } std::string get_fragment_instance_id_str() { - UniqueId uid(_fragment_instance_id); + UniqueId uid(_dest_fragment_instance_id); return uid.to_string(); } bool is_local() const { return _is_local; } + virtual void ch_roll_pb_block(); + bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } - int64_t mem_usage() const; - - // Asynchronously sends a block - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); - Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { - if (_fragment_instance_id.lo == -1) { - return Status::OK(); - } - - bool serialized = false; - if (_pblock == nullptr) { - _pblock = std::make_unique<PBlock>(); - } - RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, - &rows)); - if (serialized) { - RETURN_IF_ERROR(_send_current_block(eos)); - } - - return Status::OK(); - } - - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } - - std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback( - InstanceLoId id, bool eos) { - if (!_send_callback) { - _send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared(); - } else { - _send_callback->cntl_->Reset(); +protected: + bool _recvr_is_valid() { + if (_local_recvr && !_local_recvr->is_closed()) { + return true; } - _send_callback->init(id, eos); - return _send_callback; + _receiver_status = Status::EndOfFile( + "local data stream receiver closed"); // local data stream receiver closed + return false; } - std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(); - -protected: - Status _send_local_block(bool eos); - Status _send_current_block(bool eos); - - Status _recvr_status() const { - if (_local_recvr && !_local_recvr->is_closed()) { + Status _wait_last_brpc() { + SCOPED_TIMER(_parent->brpc_wait_timer()); + if (_send_remote_block_callback == nullptr) { return Status::OK(); } - return Status::EndOfFile( - "local data stream receiver closed"); // local data stream receiver closed + _send_remote_block_callback->join(); + if (_send_remote_block_callback->cntl_->Failed()) { + std::string err = fmt::format( + "failed to send brpc batch, error={}, error_text={}, client: {}, " + "latency = {}", + berror(_send_remote_block_callback->cntl_->ErrorCode()), + _send_remote_block_callback->cntl_->ErrorText(), + BackendOptions::get_localhost(), + _send_remote_block_callback->cntl_->latency_us()); + LOG(WARNING) << err; + return Status::RpcError(err); + } + _receiver_status = Status::create(_send_remote_block_callback->response_->status()); + return _receiver_status; } - pipeline::ExchangeSinkLocalState* _parent = nullptr; + Parent* _parent = nullptr; - const TUniqueId _fragment_instance_id; + const RowDescriptor& _row_desc; + const TUniqueId _dest_fragment_instance_id; PlanNodeId _dest_node_id; - bool _closed {false}; - bool _need_close {false}; + + // the number of RowBatch.data bytes sent successfully + int64_t _num_data_bytes_sent {}; + int64_t _packet_seq {}; + + bool _need_close; + bool _closed; Review Comment: warning: use default member initializer for '_closed' [modernize-use-default-member-init] be/src/vec/sink/vdata_stream_sender.h:115: ```diff - _closed(false), + , ``` ```suggestion bool _closed{false}; ``` ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -230,5 +234,88 @@ } \ } while (0) +class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> { +public: + PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& row_desc, + const TNetworkAddress& brpc_dest, const TUniqueId& dest_fragment_instance_id, + PlanNodeId dest_node_id) + : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, brpc_dest, + dest_fragment_instance_id, dest_node_id) { + ch_roll_pb_block(); + } + + ~PipChannel() override { delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; } Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```cpp ~PipChannel() override { delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; } ^ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org