morningman commented on code in PR #9803: URL: https://github.com/apache/incubator-doris/pull/9803#discussion_r888771409
########## be/src/exec/tablet_sink.cpp: ########## @@ -515,14 +513,31 @@ void NodeChannel::try_send_batch(RuntimeState* state) { CHECK(_pending_batches_num == 0) << _pending_batches_num; } - if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) { - request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest, - ReusableClosure<PTabletWriterAddBatchResult>>( - &request, _tuple_data_buffer, _add_batch_closure); + if (config::transfer_large_data_by_brpc && request.has_row_batch() && Review Comment: User can change this config at runtime. So better to save its value when creating tablet sink. Otherwise, it may changed during loading process. ########## be/src/runtime/data_stream_sender.cpp: ########## @@ -149,12 +156,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _closure->ref(); _closure->cntl.set_timeout_ms(_brpc_timeout_ms); - if (_parent->_transfer_data_by_brpc_attachment && _brpc_request.has_row_batch()) { - request_row_batch_transfer_attachment<PTransmitDataParams, - RefCountClosure<PTransmitDataResult>>( - &_brpc_request, _parent->_tuple_data_buffer, _closure); + if (config::transfer_large_data_by_brpc && _brpc_request.has_row_batch() && Review Comment: Save `transfer_large_data_by_brpc` when creating data stream sender ########## be/src/service/internal_service.cpp: ########## @@ -66,19 +66,53 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b PTransmitDataResult* response, google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); - VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) - << " node=" << request->node_id(); + // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl); + + _transmit_data(cntl_base, request, response, done, Status::OK()); +} + +void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, + const PEchoRequest* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); + PTransmitDataParams* request_raw = new PTransmitDataParams(); Review Comment: Memory leak? ########## be/src/service/internal_service.cpp: ########## @@ -122,20 +156,44 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { + // TODO(zxy) delete in 1.2 version + brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl); + + _tablet_writer_add_block(cntl_base, request, response, done); +} + +void PInternalServiceImpl::tablet_writer_add_block_by_http( + google::protobuf::RpcController* cntl_base, const ::doris::PEchoRequest* request, + PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { + PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest(); Review Comment: Memory leak? ########## be/src/service/internal_service.cpp: ########## @@ -66,19 +66,53 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b PTransmitDataResult* response, google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); - VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) - << " node=" << request->node_id(); + // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl); + + _transmit_data(cntl_base, request, response, done, Status::OK()); +} + +void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, + const PEchoRequest* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { + SCOPED_SWITCH_BTHREAD(); + PTransmitDataParams* request_raw = new PTransmitDataParams(); + brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + Status st = attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw, cntl); + _transmit_data(cntl_base, request_raw, response, done, st); +} + +void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done, Status extract_st) { Review Comment: ```suggestion google::protobuf::Closure* done, const Status& extract_st) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org