Gabriel39 commented on code in PR #41968:
URL: https://github.com/apache/doris/pull/41968#discussion_r1805790254


##########
be/src/vec/sink/vdata_stream_sender.h:
##########
@@ -177,151 +140,40 @@ class Channel {
 
     bool is_local() const { return _is_local; }
 
-    virtual void ch_roll_pb_block();
-
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
 
-protected:
-    bool _recvr_is_valid() {
-        if (_local_recvr && !_local_recvr->is_closed()) {
-            return true;
-        }
-        _receiver_status = Status::EndOfFile(
-                "local data stream receiver closed"); // local data stream 
receiver closed
-        return false;
-    }
-
-    Status _wait_last_brpc() {
-        SCOPED_TIMER(_parent->brpc_wait_timer());
-        if (_send_remote_block_callback == nullptr) {
-            return Status::OK();
-        }
-        _send_remote_block_callback->join();
-        if (_send_remote_block_callback->cntl_->Failed()) {
-            std::string err = fmt::format(
-                    "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
-                    "latency = {}",
-                    berror(_send_remote_block_callback->cntl_->ErrorCode()),
-                    _send_remote_block_callback->cntl_->ErrorText(),
-                    BackendOptions::get_localhost(),
-                    _send_remote_block_callback->cntl_->latency_us());
-            LOG(WARNING) << err;
-            return Status::RpcError(err);
-        }
-        _receiver_status = 
Status::create(_send_remote_block_callback->response_->status());
-        return _receiver_status;
-    }
-
-    Status close_internal(Status exec_status);
-
-    Parent* _parent = nullptr;
-
-    const RowDescriptor& _row_desc;
-    const TUniqueId _fragment_instance_id;
-    PlanNodeId _dest_node_id;
-
-    // the number of RowBatch.data bytes sent successfully
-    int64_t _num_data_bytes_sent {};
-    int64_t _packet_seq {};
-
-    bool _need_close;
-    bool _closed;
-    int _be_number;
-
-    TNetworkAddress _brpc_dest_addr;
-
-    PUniqueId _finst_id;
-    PUniqueId _query_id;
-    PBlock _pb_block;
-    std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
-    std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
-    std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> 
_send_remote_block_callback;
-    Status _receiver_status;
-    int32_t _brpc_timeout_ms = 500;
-    RuntimeState* _state = nullptr;
-
-    bool _is_local;
-    std::shared_ptr<VDataStreamRecvr> _local_recvr;
-    // serialized blocks for broadcasting; we need two so we can write
-    // one while the other one is still being sent.
-    // Which is for same reason as `_cur_pb_block`, `_pb_block1` and 
`_pb_block2`
-    // in VDataStreamSender.
-    PBlock* _ch_cur_pb_block = nullptr;
-    PBlock _ch_pb_block1;
-    PBlock _ch_pb_block2;
-
-    BlockSerializer<Parent> _serializer;
-};
-
-#define HANDLE_CHANNEL_STATUS(state, channel, status)    \
-    do {                                                 \
-        if (status.is<ErrorCode::END_OF_FILE>()) {       \
-            _handle_eof_channel(state, channel, status); \
-        } else {                                         \
-            RETURN_IF_ERROR(status);                     \
-        }                                                \
-    } while (0)
-
-class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
-public:
-    PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& 
row_desc,
-               const TNetworkAddress& brpc_dest, const TUniqueId& 
fragment_instance_id,
-               PlanNodeId dest_node_id)
-            : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, 
brpc_dest,
-                                                        fragment_instance_id, 
dest_node_id) {
-        ch_roll_pb_block();
-    }
-
-    ~PipChannel() override { delete 
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
-
     int64_t mem_usage() const;
 
-    void ch_roll_pb_block() override {
-        // We have two choices here.
-        // 1. Use a PBlock pool and fetch an available PBlock if we need one. 
In this way, we can
-        //    reuse the memory, but we have to use a lock to synchronize.
-        // 2. Create a new PBlock every time. In this way we don't need a lock 
but have to allocate
-        //    new memory.
-        // Now we use the second way.
-        Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new 
PBlock();
-    }
-
     // Asynchronously sends a block
     // Returns the status of the most recently finished transmit_data
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
-    Status send_remote_block(PBlock* block, bool eos = false,
-                             Status exec_status = Status::OK()) override;
+    Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false,
+                             Status exec_status = Status::OK());
 
-    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
-                                bool eos = false) override;
+    Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, 
bool eos = false);
 
-    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
override {
-        if 
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
+    Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) 
{
+        if (_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
 
         bool serialized = false;
         _pblock = std::make_unique<PBlock>();

Review Comment:
   DCHECK(_pblock == nullptr)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to