This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 6834fb23ca [fix](s3) fix s3 Temp file may write failed because of has no space on disk (#9421) 6834fb23ca is described below commit 6834fb23cadb7c065ac16aa0e27b73060a3951ed Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Mon May 9 09:28:43 2022 +0800 [fix](s3) fix s3 Temp file may write failed because of has no space on disk (#9421) --- be/src/common/config.h | 1 - be/src/exec/s3_writer.cpp | 36 +++++-- be/src/runtime/row_batch.cpp | 54 ++++++---- be/src/runtime/tmp_file_mgr.cc | 9 ++ be/src/runtime/tmp_file_mgr.h | 3 + be/src/service/brpc_service.cpp | 3 +- be/src/service/internal_service.cpp | 201 +++++++++++++++--------------------- be/src/service/internal_service.h | 3 +- 8 files changed, 159 insertions(+), 151 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 28714c6b24..1c4c91160e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -737,7 +737,6 @@ CONF_Validator(string_type_length_soft_limit_bytes, // used for olap scanner to save memory, when the size of unused_object_pool // is greater than object_pool_buffer_size, release the object in the unused_object_pool. CONF_Int32(object_pool_buffer_size, "100"); - } // namespace config } // namespace doris diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp index 17e64a4e28..8b44c621d5 100644 --- a/be/src/exec/s3_writer.cpp +++ b/be/src/exec/s3_writer.cpp @@ -23,6 +23,8 @@ #include <aws/s3/model/PutObjectRequest.h> #include "common/logging.h" +#include "runtime/exec_env.h" +#include "runtime/tmp_file_mgr.h" #include "service/backend_options.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -41,10 +43,15 @@ S3Writer::S3Writer(const std::map<std::string, std::string>& properties, const s : _properties(properties), _path(path), _uri(path), - _client(ClientFactory::instance().create(_properties)), - _temp_file(std::make_shared<Aws::Utils::TempFile>( - std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | - std::ios_base::out)) { + _client(ClientFactory::instance().create(_properties)) { + std::string tmp_path = ExecEnv::GetInstance()->tmp_file_mgr()->get_tmp_dir_path(); + LOG(INFO) << "init aws s3 client with tmp path " << tmp_path; + if (tmp_path.at(tmp_path.size() - 1) != '/') { + tmp_path.append("/"); + } + _temp_file = std::make_shared<Aws::Utils::TempFile>( + tmp_path.c_str(), ".doris_tmp", + std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | std::ios_base::out); DCHECK(_client) << "init aws s3 client error."; } @@ -78,13 +85,19 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len) return Status::OK(); } if (!_temp_file) { - return Status::BufferAllocFailed("The internal temporary file is not writable. at " + - BackendOptions::get_localhost()); + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::BufferAllocFailed( + fmt::format("The internal temporary file is not writable for {}. at {}", + strerror(errno), BackendOptions::get_localhost())), + "write temp file error"); } _temp_file->write(reinterpret_cast<const char*>(buf), buf_len); if (!_temp_file->good()) { - return Status::BufferAllocFailed("Could not append to the internal temporary file. at " + - BackendOptions::get_localhost()); + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::BufferAllocFailed( + fmt::format("Could not append to the internal temporary file for {}. at {}", + strerror(errno), BackendOptions::get_localhost())), + "write temp file error"); } *written_len = buf_len; return Status::OK(); @@ -100,8 +113,11 @@ Status S3Writer::close() { Status S3Writer::_sync() { if (!_temp_file) { - return Status::BufferAllocFailed("The internal temporary file is not writable. at " + - BackendOptions::get_localhost()); + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::BufferAllocFailed( + fmt::format("The internal temporary file is not writable for {}. at {}", + strerror(errno), BackendOptions::get_localhost())), + "write temp file error"); } CHECK_S3_CLIENT(_client); Aws::S3::Model::PutObjectRequest request; diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index f8727abe6b..54b2e7680e 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -157,7 +157,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch) for (auto slot : desc->string_slots()) { DCHECK(slot->type().is_string_type()); StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); - int offset = convert_to<int>(string_val->ptr); + int64_t offset = convert_to<int64_t>(string_val->ptr); string_val->ptr = tuple_data + offset; // Why we do this mask? Field len of StringValue is changed from int to size_t in @@ -225,10 +225,10 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, // is_compressed output_batch->set_is_compressed(false); // tuple data - size_t size = total_byte_size(); + size_t tuple_byte_size = total_byte_size(); std::string* mutable_tuple_data = nullptr; if (allocated_buf != nullptr) { - allocated_buf->resize(size); + allocated_buf->resize(tuple_byte_size); // all tuple data will be written in the allocated_buf // instead of tuple_data in PRowBatch mutable_tuple_data = allocated_buf; @@ -236,7 +236,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, output_batch->set_tuple_data(""); } else { mutable_tuple_data = output_batch->mutable_tuple_data(); - mutable_tuple_data->resize(size); + mutable_tuple_data->resize(tuple_byte_size); } // Copy tuple data, including strings, into output_batch (converting string @@ -261,37 +261,51 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, mutable_tuple_offsets->Add((int32_t)offset); mutable_new_tuple_offsets->Add(offset); row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); - CHECK_LE(offset, size); + CHECK_LE(offset, tuple_byte_size); } } - CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size; - - if (config::compress_rowbatches && size > 0) { - // Try compressing tuple_data to _compression_scratch, swap if compressed data is - // smaller - uint32_t max_compressed_size = snappy::MaxCompressedLength(size); - - if (_compression_scratch.size() < max_compressed_size) { + CHECK_EQ(offset, tuple_byte_size) + << "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size; + + size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size); + bool can_compress = config::compress_rowbatches && tuple_byte_size > 0; + if (can_compress) { + try { + // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails _compression_scratch.resize(max_compressed_size); + } catch (const std::bad_alloc& e) { + can_compress = false; + LOG(WARNING) << "Try to alloc " << max_compressed_size + << " bytes for compression scratch failed. " << e.what(); + } catch (...) { + can_compress = false; + std::exception_ptr p = std::current_exception(); + LOG(WARNING) << "Try to alloc " << max_compressed_size + << " bytes for compression scratch failed. " + << (p ? p.__cxa_exception_type()->name() : "null"); } - + } + if (can_compress) { + // Try compressing tuple_data to _compression_scratch, swap if compressed data is + // smaller size_t compressed_size = 0; char* compressed_output = _compression_scratch.data(); - snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size); - - if (LIKELY(compressed_size < size)) { + snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output, + &compressed_size); + if (LIKELY(compressed_size < tuple_byte_size)) { _compression_scratch.resize(compressed_size); mutable_tuple_data->swap(_compression_scratch); output_batch->set_is_compressed(true); } - VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size; + VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size + << ", compressed size: " << compressed_size; } // return compressed and uncompressed size size_t pb_size = get_batch_size(*output_batch); if (allocated_buf == nullptr) { - *uncompressed_size = pb_size - mutable_tuple_data->size() + size; + *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size; *compressed_size = pb_size; if (pb_size > std::numeric_limits<int32_t>::max()) { // the protobuf has a hard limit of 2GB for serialized data. @@ -302,7 +316,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, pb_size)); } } else { - *uncompressed_size = pb_size + size; + *uncompressed_size = pb_size + tuple_byte_size; *compressed_size = pb_size + mutable_tuple_data->size(); } return Status::OK(); diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index 15a1516293..16e3649bd1 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -21,6 +21,7 @@ #include <boost/uuid/random_generator.hpp> #include <boost/uuid/uuid_io.hpp> #include <filesystem> +#include <random> #include "olap/storage_engine.h" #include "runtime/exec_env.h" @@ -160,6 +161,14 @@ string TmpFileMgr::get_tmp_dir_path(DeviceId device_id) const { return _tmp_dirs[device_id].path(); } +std::string TmpFileMgr::get_tmp_dir_path() { + std::vector<DeviceId> devices = active_tmp_devices(); + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(devices.begin(), devices.end(), g); + return get_tmp_dir_path(devices.front()); +} + void TmpFileMgr::blacklist_device(DeviceId device_id) { DCHECK(_initialized); DCHECK(device_id >= 0 && device_id < _tmp_dirs.size()); diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h index f9d2799ce0..4db957294f 100644 --- a/be/src/runtime/tmp_file_mgr.h +++ b/be/src/runtime/tmp_file_mgr.h @@ -126,6 +126,9 @@ public: // Return the scratch directory path for the device. std::string get_tmp_dir_path(DeviceId device_id) const; + // Return a random scratch directory path from the devices. + std::string get_tmp_dir_path(); + // Total number of devices with tmp directories that are active. There is one tmp // directory per device. int num_active_tmp_devices(); diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp index 01898914b5..0cd79d4fbe 100644 --- a/be/src/service/brpc_service.cpp +++ b/be/src/service/brpc_service.cpp @@ -42,8 +42,7 @@ BRpcService::~BRpcService() {} Status BRpcService::start(int port) { // Add service - _server->AddService(new PInternalServiceImpl<PBackendService>(_exec_env), - brpc::SERVER_OWNS_SERVICE); + _server->AddService(new PInternalServiceImpl(_exec_env), brpc::SERVER_OWNS_SERVICE); // start service brpc::ServerOptions options; if (config::brpc_num_threads != -1) { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 9fa45b9a66..d9e5756210 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -49,25 +49,22 @@ static void thread_context_deleter(void* d) { delete static_cast<ThreadContext*>(d); } -template <typename T> -PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env) +PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env) : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) { REGISTER_HOOK_METRIC(add_batch_task_queue_size, [this]() { return _tablet_worker_pool.get_queue_size(); }); CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter)); } -template <typename T> -PInternalServiceImpl<T>::~PInternalServiceImpl() { +PInternalServiceImpl::~PInternalServiceImpl() { DEREGISTER_HOOK_METRIC(add_batch_task_queue_size); CHECK_EQ(0, bthread_key_delete(btls_key)); } -template <typename T> -void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cntl_base, - const PTransmitDataParams* request, - PTransmitDataResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_base, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); @@ -89,11 +86,10 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt } } -template <typename T> -void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController* controller, - const PTabletWriterOpenRequest* request, - PTabletWriterOpenResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* controller, + const PTabletWriterOpenRequest* request, + PTabletWriterOpenResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); @@ -107,11 +103,10 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController st.to_protobuf(response->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* cntl_base, - const PExecPlanFragmentRequest* request, - PExecPlanFragmentResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* cntl_base, + const PExecPlanFragmentRequest* request, + PExecPlanFragmentResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); @@ -123,11 +118,10 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController st.to_protobuf(response->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, - const PTabletWriterAddBlockRequest* request, - PTabletWriterAddBlockResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, + const PTabletWriterAddBlockRequest* request, + PTabletWriterAddBlockResult* response, + google::protobuf::Closure* done) { VLOG_RPC << "tablet writer add block, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -154,11 +148,10 @@ void PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcContr }); } -template <typename T> -void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, - const PTabletWriterAddBatchRequest* request, - PTabletWriterAddBatchResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, + const PTabletWriterAddBatchRequest* request, + PTabletWriterAddBatchResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() @@ -189,11 +182,10 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr }); } -template <typename T> -void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcController* controller, - const PTabletWriterCancelRequest* request, - PTabletWriterCancelResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* controller, + const PTabletWriterCancelRequest* request, + PTabletWriterCancelResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); @@ -206,8 +198,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll } } -template <typename T> -Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_request, bool compact) { +Status PInternalServiceImpl::_exec_plan_fragment(const std::string& ser_request, bool compact) { TExecPlanFragmentParams t_request; { const uint8_t* buf = (const uint8_t*)ser_request.data(); @@ -217,11 +208,10 @@ Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& ser_reque return _exec_env->fragment_mgr()->exec_plan_fragment(t_request); } -template <typename T> -void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, - const PCancelPlanFragmentRequest* request, - PCancelPlanFragmentResult* result, - google::protobuf::Closure* done) { +void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* cntl_base, + const PCancelPlanFragmentRequest* request, + PCancelPlanFragmentResult* result, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; @@ -243,20 +233,18 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll st.to_protobuf(result->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_base, - const PFetchDataRequest* request, PFetchDataResult* result, - google::protobuf::Closure* done) { +void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, + const PFetchDataRequest* request, PFetchDataResult* result, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); } -template <typename T> -void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller, - const PProxyRequest* request, PProxyResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, + const PProxyRequest* request, PProxyResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto @@ -315,41 +303,34 @@ void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll Status::OK().to_protobuf(response->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* controller, - const PUpdateCacheRequest* request, - PCacheResponse* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, + const PUpdateCacheRequest* request, + PCacheResponse* response, google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } -template <typename T> -void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* controller, - const PFetchCacheRequest* request, - PFetchCacheResult* result, - google::protobuf::Closure* done) { +void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, + const PFetchCacheRequest* request, PFetchCacheResult* result, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } -template <typename T> -void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* controller, - const PClearCacheRequest* request, - PCacheResponse* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, + const PClearCacheRequest* request, PCacheResponse* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } -template <typename T> -void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* controller, - const ::doris::PMergeFilterRequest* request, - ::doris::PMergeFilterResponse* response, - ::google::protobuf::Closure* done) { +void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* controller, + const ::doris::PMergeFilterRequest* request, + ::doris::PMergeFilterResponse* response, + ::google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast<brpc::Controller*>(controller)->request_attachment(); @@ -360,11 +341,10 @@ void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co st.to_protobuf(response->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* controller, - const ::doris::PPublishFilterRequest* request, - ::doris::PPublishFilterResponse* response, - ::google::protobuf::Closure* done) { +void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* controller, + const ::doris::PPublishFilterRequest* request, + ::doris::PPublishFilterResponse* response, + ::google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment(); @@ -378,10 +358,9 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co st.to_protobuf(response->mutable_status()); } -template <typename T> -void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller, - const PSendDataRequest* request, PSendDataResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, + const PSendDataRequest* request, PSendDataResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -402,10 +381,9 @@ void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* control } } -template <typename T> -void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller, - const PCommitRequest* request, PCommitResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, + const PCommitRequest* request, PCommitResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -421,10 +399,9 @@ void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller } } -template <typename T> -void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller, - const PRollbackRequest* request, PRollbackResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, + const PRollbackRequest* request, PRollbackResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; @@ -440,11 +417,10 @@ void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controll } } -template <typename T> -void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController* cntl_base, - const PConstantExprRequest* request, - PConstantExprResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* cntl_base, + const PConstantExprRequest* request, + PConstantExprResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); @@ -462,9 +438,8 @@ void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController st.to_protobuf(response->mutable_status()); } -template <typename T> -Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_request, - PConstantExprResult* response) { +Status PInternalServiceImpl::_fold_constant_expr(const std::string& ser_request, + PConstantExprResult* response) { TFoldConstantParams t_request; { const uint8_t* buf = (const uint8_t*)ser_request.data(); @@ -477,11 +452,10 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_reque return FoldConstantExecutor().fold_constant_vexpr(t_request, response); } -template <typename T> -void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cntl_base, - const PTransmitDataParams* request, - PTransmitDataResult* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_base, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); @@ -503,11 +477,10 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn } } -template <typename T> -void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController* controller, - const PCheckRPCChannelRequest* request, - PCheckRPCChannelResponse* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* controller, + const PCheckRPCChannelRequest* request, + PCheckRPCChannelResponse* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); @@ -531,11 +504,10 @@ void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController* } } -template <typename T> -void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController* controller, - const PResetRPCChannelRequest* request, - PResetRPCChannelResponse* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* controller, + const PResetRPCChannelRequest* request, + PResetRPCChannelResponse* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); @@ -566,11 +538,10 @@ void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController* } } -template <typename T> -void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_base, - const PHandShakeRequest* request, - PHandShakeResponse* response, - google::protobuf::Closure* done) { +void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base, + const PHandShakeRequest* request, + PHandShakeResponse* response, + google::protobuf::Closure* done) { SCOPED_SWITCH_BTHREAD(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { @@ -579,6 +550,4 @@ void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b response->mutable_status()->set_status_code(0); } -template class PInternalServiceImpl<PBackendService>; - } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index ce4913701d..18a1667f56 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -30,8 +30,7 @@ namespace doris { class ExecEnv; -template <typename T> -class PInternalServiceImpl : public T { +class PInternalServiceImpl : public PBackendService { public: PInternalServiceImpl(ExecEnv* exec_env); virtual ~PInternalServiceImpl(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org