This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3c6676ce2ebc09783d6710943d202efa66a912cd
Author: Pxl <pxl...@qq.com>
AuthorDate: Mon Jul 24 09:25:32 2023 +0800

    [Improvement](pipeline) support send eos on local exchange and remove some 
unused code (#22086)
    
    support send eos on local exchange and remove some unused code
---
 be/src/pipeline/exec/exchange_source_operator.cpp |  1 -
 be/src/runtime/fragment_mgr.cpp                   |  2 +-
 be/src/service/brpc_service.cpp                   |  2 +-
 be/src/service/internal_service.cpp               | 17 ++-----
 be/src/service/internal_service.h                 |  2 +-
 be/src/util/proto_util.h                          | 56 -----------------------
 be/src/vec/exec/vexchange_node.h                  |  1 -
 be/src/vec/sink/vdata_stream_sender.cpp           | 16 +++++--
 be/src/vec/sink/vdata_stream_sender.h             | 12 ++---
 be/test/vec/exec/vtablet_sink_test.cpp            |  2 -
 be/test/vec/runtime/vdata_stream_test.cpp         |  2 -
 11 files changed, 24 insertions(+), 89 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 1dd20f929c..84e4288dce 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -32,7 +32,6 @@ bool ExchangeSourceOperator::can_read() {
 }
 
 bool ExchangeSourceOperator::is_pending_finish() const {
-    // TODO HappenLee
     return false;
 }
 } // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index f8e7750151..cfb682b75d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -138,7 +138,7 @@ public:
     }
 
     // Update status of this fragment execute
-    Status update_status(Status status) {
+    Status update_status(const Status& status) {
         std::lock_guard<std::mutex> l(_status_lock);
         if (!status.ok() && _exec_status.ok()) {
             _exec_status = status;
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 80138b99ee..2406d08b1e 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -47,7 +47,7 @@ BRpcService::BRpcService(ExecEnv* exec_env) : 
_exec_env(exec_env), _server(new b
     brpc::FLAGS_socket_max_unwritten_bytes = 
config::brpc_socket_max_unwritten_bytes;
 }
 
-BRpcService::~BRpcService() {}
+BRpcService::~BRpcService() = default;
 
 Status BRpcService::start(int port, int num_threads) {
     // Add service
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index c38416a593..924e829539 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -328,12 +328,7 @@ void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
                                                    
PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* 
done) {
     bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
-        // TODO(zxy) delete in 1.2 version
-        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
-
-        _tablet_writer_add_block(controller, request, response, new_done);
+        _tablet_writer_add_block(controller, request, response, done);
     });
     if (!ret) {
         LOG(WARNING) << "fail to offer request to the work pool";
@@ -1024,13 +1019,9 @@ void 
PInternalServiceImpl::transmit_block(google::protobuf::RpcController* contr
                                           google::protobuf::Closure* done) {
     int64_t receive_time = GetCurrentTimeNanos();
     response->set_receive_time(receive_time);
-    bool ret = _heavy_work_pool.try_offer([this, controller, request, 
response, done]() {
-        // TODO(zxy) delete in 1.2 version
-        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
-
-        _transmit_block(controller, request, response, new_done, Status::OK());
+    PriorityThreadPool& pool = request->has_block() ? _heavy_work_pool : 
_light_work_pool;
+    bool ret = pool.try_offer([this, controller, request, response, done]() {
+        _transmit_block(controller, request, response, done, Status::OK());
     });
     if (!ret) {
         LOG(WARNING) << "fail to offer request to the work pool";
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index 0d568e2b84..823f29504b 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -41,7 +41,7 @@ class PHandShakeResponse;
 class PInternalServiceImpl : public PBackendService {
 public:
     PInternalServiceImpl(ExecEnv* exec_env);
-    virtual ~PInternalServiceImpl();
+    ~PInternalServiceImpl() override;
 
     void transmit_data(::google::protobuf::RpcController* controller,
                        const ::doris::PTransmitDataParams* request,
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 2741092d80..3c583b867c 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -86,62 +86,6 @@ Status transmit_block_http(RuntimeState* state, Closure* 
closure, PTransmitDataP
     return Status::OK();
 }
 
-// TODO(zxy) delete in v1.3 version
-// Transfer RowBatch in ProtoBuf Request to Controller Attachment.
-// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
-// and it is expected that performance can be improved.
-template <typename Params, typename Closure>
-void request_row_batch_transfer_attachment(Params* brpc_request, const 
std::string& tuple_data,
-                                           Closure* closure) {
-    auto row_batch = brpc_request->mutable_row_batch();
-    row_batch->set_tuple_data("");
-    brpc_request->set_transfer_by_attachment(true);
-    butil::IOBuf attachment;
-    attachment.append(tuple_data);
-    closure->cntl.request_attachment().swap(attachment);
-}
-
-// TODO(zxy) delete in v1.3 version
-// Transfer Block in ProtoBuf Request to Controller Attachment.
-// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
-// and it is expected that performance can be improved.
-template <typename Params, typename Closure>
-void request_block_transfer_attachment(Params* brpc_request, const 
std::string& column_values,
-                                       Closure* closure) {
-    auto block = brpc_request->mutable_block();
-    block->set_column_values("");
-    brpc_request->set_transfer_by_attachment(true);
-    butil::IOBuf attachment;
-    attachment.append(column_values);
-    closure->cntl.request_attachment().swap(attachment);
-}
-
-// TODO(zxy) delete in v1.3 version
-// Controller Attachment transferred to RowBatch in ProtoBuf Request.
-template <typename Params>
-void attachment_transfer_request_row_batch(const Params* brpc_request, 
brpc::Controller* cntl) {
-    Params* req = const_cast<Params*>(brpc_request);
-    if (req->has_row_batch() && req->transfer_by_attachment()) {
-        auto rb = req->mutable_row_batch();
-        const butil::IOBuf& io_buf = cntl->request_attachment();
-        CHECK(io_buf.size() > 0) << io_buf.size() << ", row num: " << 
req->row_batch().num_rows();
-        io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
-    }
-}
-
-// TODO(zxy) delete in v1.3 version
-// Controller Attachment transferred to Block in ProtoBuf Request.
-template <typename Params>
-void attachment_transfer_request_block(const Params* brpc_request, 
brpc::Controller* cntl) {
-    Params* req = const_cast<Params*>(brpc_request);
-    if (req->has_block() && req->transfer_by_attachment()) {
-        auto block = req->mutable_block();
-        const butil::IOBuf& io_buf = cntl->request_attachment();
-        CHECK(io_buf.size() > 0) << io_buf.size();
-        io_buf.copy_to(block->mutable_column_values(), io_buf.size(), 0);
-    }
-}
-
 template <typename Params, typename Closure>
 Status request_embed_attachment(Params* brpc_request, const std::string& data, 
Closure* closure) {
     butil::IOBuf attachment;
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 58b61dc9af..c4f083dda4 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -58,7 +58,6 @@ public:
     Status collect_query_statistics(QueryStatistics* statistics) override;
     Status close(RuntimeState* state) override;
 
-    // Status collect_query_statistics(QueryStatistics* statistics) override;
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
 
 private:
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b49c6ec92e..82050276d2 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -134,7 +134,7 @@ Status Channel::send_local_block(bool eos) {
         return Status::OK();
     } else {
         _mutable_block.reset();
-        return receiver_status_;
+        return _receiver_status;
     }
 }
 
@@ -147,7 +147,7 @@ Status Channel::send_local_block(Block* block) {
         _local_recvr->add_block(block, _parent->_sender_id, false);
         return Status::OK();
     } else {
-        return receiver_status_;
+        return _receiver_status;
     }
 }
 
@@ -256,8 +256,8 @@ Status Channel::close_internal() {
     VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
              << " dest_node=" << _dest_node_id
              << " #rows= " << ((_mutable_block == nullptr) ? 0 : 
_mutable_block->rows())
-             << " receiver status: " << receiver_status_;
-    if (receiver_status_.is<ErrorCode::END_OF_FILE>()) {
+             << " receiver status: " << _receiver_status;
+    if (is_receiver_eof()) {
         _mutable_block.reset();
         return Status::OK();
     }
@@ -266,7 +266,13 @@ Status Channel::close_internal() {
         status = send_current_block(true);
     } else {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
-        status = send_block((PBlock*)nullptr, true);
+        if (is_local()) {
+            if (_recvr_is_valid()) {
+                _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+            }
+        } else {
+            status = send_block((PBlock*)nullptr, true);
+        }
     }
     // Don't wait for the last packet to finish, left it to close_wait.
     if (status.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 2fdecac6cb..2512a21475 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -294,16 +294,16 @@ public:
                _local_recvr->sender_queue_empty(_parent->_sender_id);
     }
 
-    bool is_receiver_eof() const { return 
receiver_status_.is<ErrorCode::END_OF_FILE>(); }
+    bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
-    void set_receiver_eof(Status st) { receiver_status_ = st; }
+    void set_receiver_eof(Status st) { _receiver_status = st; }
 
 protected:
     bool _recvr_is_valid() {
         if (_local_recvr && !_local_recvr->is_closed()) {
             return true;
         }
-        receiver_status_ = Status::EndOfFile("local data stream receiver 
closed");
+        _receiver_status = Status::EndOfFile("local data stream receiver 
closed");
         return false;
     }
 
@@ -315,7 +315,7 @@ protected:
         auto cntl = &_closure->cntl;
         auto call_id = _closure->cntl.call_id();
         brpc::Join(call_id);
-        receiver_status_ = Status::create(_closure->result.status());
+        _receiver_status = Status::create(_closure->result.status());
         if (cntl->Failed()) {
             std::string err = fmt::format(
                     "failed to send brpc batch, error={}, error_text={}, 
client: {}, "
@@ -325,7 +325,7 @@ protected:
             LOG(WARNING) << err;
             return Status::RpcError(err);
         }
-        return receiver_status_;
+        return _receiver_status;
     }
 
     // Serialize _batch into _thrift_batch and send via send_batch().
@@ -358,7 +358,7 @@ protected:
     PTransmitDataParams _brpc_request;
     std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
     RefCountClosure<PTransmitDataResult>* _closure = nullptr;
-    Status receiver_status_;
+    Status _receiver_status;
     int32_t _brpc_timeout_ms = 500;
     // whether the dest can be treated as query statistics transfer chain.
     bool _is_transfer_chain;
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index 15ee6b79a2..5e60181c8f 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -316,8 +316,6 @@ public:
             k_add_batch_status.to_protobuf(response->mutable_status());
 
             if (request->has_block() && _row_desc != nullptr) {
-                brpc::Controller* cntl = 
static_cast<brpc::Controller*>(controller);
-                
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
                 vectorized::Block block(request->block());
 
                 for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
diff --git a/be/test/vec/runtime/vdata_stream_test.cpp 
b/be/test/vec/runtime/vdata_stream_test.cpp
index 4c42bde016..954bf8f194 100644
--- a/be/test/vec/runtime/vdata_stream_test.cpp
+++ b/be/test/vec/runtime/vdata_stream_test.cpp
@@ -74,8 +74,6 @@ public:
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response, 
::google::protobuf::Closure* done) {
         // stream_mgr->transmit_block(request, &done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
         // The response is accessed when done->Run is called in 
transmit_block(),
         // give response a default value to avoid null pointers in high 
concurrency.
         Status st;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to