This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6834fb23ca [fix](s3) fix s3 Temp file may write failed because of has 
no space on disk (#9421)
6834fb23ca is described below

commit 6834fb23cadb7c065ac16aa0e27b73060a3951ed
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Mon May 9 09:28:43 2022 +0800

    [fix](s3) fix s3 Temp file may write failed because of has no space on disk 
(#9421)
---
 be/src/common/config.h              |   1 -
 be/src/exec/s3_writer.cpp           |  36 +++++--
 be/src/runtime/row_batch.cpp        |  54 ++++++----
 be/src/runtime/tmp_file_mgr.cc      |   9 ++
 be/src/runtime/tmp_file_mgr.h       |   3 +
 be/src/service/brpc_service.cpp     |   3 +-
 be/src/service/internal_service.cpp | 201 +++++++++++++++---------------------
 be/src/service/internal_service.h   |   3 +-
 8 files changed, 159 insertions(+), 151 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 28714c6b24..1c4c91160e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -737,7 +737,6 @@ CONF_Validator(string_type_length_soft_limit_bytes,
 // used for olap scanner to save memory, when the size of unused_object_pool
 // is greater than object_pool_buffer_size, release the object in the 
unused_object_pool.
 CONF_Int32(object_pool_buffer_size, "100");
-
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 17e64a4e28..8b44c621d5 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -23,6 +23,8 @@
 #include <aws/s3/model/PutObjectRequest.h>
 
 #include "common/logging.h"
+#include "runtime/exec_env.h"
+#include "runtime/tmp_file_mgr.h"
 #include "service/backend_options.h"
 #include "util/s3_uri.h"
 #include "util/s3_util.h"
@@ -41,10 +43,15 @@ S3Writer::S3Writer(const std::map<std::string, 
std::string>& properties, const s
         : _properties(properties),
           _path(path),
           _uri(path),
-          _client(ClientFactory::instance().create(_properties)),
-          _temp_file(std::make_shared<Aws::Utils::TempFile>(
-                  std::ios_base::binary | std::ios_base::trunc | 
std::ios_base::in |
-                  std::ios_base::out)) {
+          _client(ClientFactory::instance().create(_properties)) {
+    std::string tmp_path = 
ExecEnv::GetInstance()->tmp_file_mgr()->get_tmp_dir_path();
+    LOG(INFO) << "init aws s3 client with tmp path " << tmp_path;
+    if (tmp_path.at(tmp_path.size() - 1) != '/') {
+        tmp_path.append("/");
+    }
+    _temp_file = std::make_shared<Aws::Utils::TempFile>(
+            tmp_path.c_str(), ".doris_tmp",
+            std::ios_base::binary | std::ios_base::trunc | std::ios_base::in | 
std::ios_base::out);
     DCHECK(_client) << "init aws s3 client error.";
 }
 
@@ -78,13 +85,19 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, 
size_t* written_len)
         return Status::OK();
     }
     if (!_temp_file) {
-        return Status::BufferAllocFailed("The internal temporary file is not 
writable. at " +
-                                         BackendOptions::get_localhost());
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::BufferAllocFailed(
+                        fmt::format("The internal temporary file is not 
writable for {}. at {}",
+                                    strerror(errno), 
BackendOptions::get_localhost())),
+                "write temp file error");
     }
     _temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
     if (!_temp_file->good()) {
-        return Status::BufferAllocFailed("Could not append to the internal 
temporary file. at " +
-                                         BackendOptions::get_localhost());
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::BufferAllocFailed(
+                        fmt::format("Could not append to the internal 
temporary file for {}. at {}",
+                                    strerror(errno), 
BackendOptions::get_localhost())),
+                "write temp file error");
     }
     *written_len = buf_len;
     return Status::OK();
@@ -100,8 +113,11 @@ Status S3Writer::close() {
 
 Status S3Writer::_sync() {
     if (!_temp_file) {
-        return Status::BufferAllocFailed("The internal temporary file is not 
writable. at " +
-                                         BackendOptions::get_localhost());
+        RETURN_NOT_OK_STATUS_WITH_WARN(
+                Status::BufferAllocFailed(
+                        fmt::format("The internal temporary file is not 
writable for {}. at {}",
+                                    strerror(errno), 
BackendOptions::get_localhost())),
+                "write temp file error");
     }
     CHECK_S3_CLIENT(_client);
     Aws::S3::Model::PutObjectRequest request;
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index f8727abe6b..54b2e7680e 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -157,7 +157,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const 
PRowBatch& input_batch)
             for (auto slot : desc->string_slots()) {
                 DCHECK(slot->type().is_string_type());
                 StringValue* string_val = 
tuple->get_string_slot(slot->tuple_offset());
-                int offset = convert_to<int>(string_val->ptr);
+                int64_t offset = convert_to<int64_t>(string_val->ptr);
                 string_val->ptr = tuple_data + offset;
 
                 // Why we do this mask? Field len of StringValue is changed 
from int to size_t in
@@ -225,10 +225,10 @@ Status RowBatch::serialize(PRowBatch* output_batch, 
size_t* uncompressed_size,
     // is_compressed
     output_batch->set_is_compressed(false);
     // tuple data
-    size_t size = total_byte_size();
+    size_t tuple_byte_size = total_byte_size();
     std::string* mutable_tuple_data = nullptr;
     if (allocated_buf != nullptr) {
-        allocated_buf->resize(size);
+        allocated_buf->resize(tuple_byte_size);
         // all tuple data will be written in the allocated_buf
         // instead of tuple_data in PRowBatch
         mutable_tuple_data = allocated_buf;
@@ -236,7 +236,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* 
uncompressed_size,
         output_batch->set_tuple_data("");
     } else {
         mutable_tuple_data = output_batch->mutable_tuple_data();
-        mutable_tuple_data->resize(size);
+        mutable_tuple_data->resize(tuple_byte_size);
     }
 
     // Copy tuple data, including strings, into output_batch (converting string
@@ -261,37 +261,51 @@ Status RowBatch::serialize(PRowBatch* output_batch, 
size_t* uncompressed_size,
             mutable_tuple_offsets->Add((int32_t)offset);
             mutable_new_tuple_offsets->Add(offset);
             row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* 
convert_ptrs */ true);
-            CHECK_LE(offset, size);
+            CHECK_LE(offset, tuple_byte_size);
         }
     }
-    CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
-
-    if (config::compress_rowbatches && size > 0) {
-        // Try compressing tuple_data to _compression_scratch, swap if 
compressed data is
-        // smaller
-        uint32_t max_compressed_size = snappy::MaxCompressedLength(size);
-
-        if (_compression_scratch.size() < max_compressed_size) {
+    CHECK_EQ(offset, tuple_byte_size)
+            << "offset: " << offset << " vs. tuple_byte_size: " << 
tuple_byte_size;
+
+    size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
+    bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
+    if (can_compress) {
+        try {
+            // Allocation of extra-long contiguous memory may fail, and data 
compression cannot be used if it fails
             _compression_scratch.resize(max_compressed_size);
+        } catch (const std::bad_alloc& e) {
+            can_compress = false;
+            LOG(WARNING) << "Try to alloc " << max_compressed_size
+                         << " bytes for compression scratch failed. " << 
e.what();
+        } catch (...) {
+            can_compress = false;
+            std::exception_ptr p = std::current_exception();
+            LOG(WARNING) << "Try to alloc " << max_compressed_size
+                         << " bytes for compression scratch failed. "
+                         << (p ? p.__cxa_exception_type()->name() : "null");
         }
-
+    }
+    if (can_compress) {
+        // Try compressing tuple_data to _compression_scratch, swap if 
compressed data is
+        // smaller
         size_t compressed_size = 0;
         char* compressed_output = _compression_scratch.data();
-        snappy::RawCompress(mutable_tuple_data->data(), size, 
compressed_output, &compressed_size);
-
-        if (LIKELY(compressed_size < size)) {
+        snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, 
compressed_output,
+                            &compressed_size);
+        if (LIKELY(compressed_size < tuple_byte_size)) {
             _compression_scratch.resize(compressed_size);
             mutable_tuple_data->swap(_compression_scratch);
             output_batch->set_is_compressed(true);
         }
 
-        VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << 
compressed_size;
+        VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
+                 << ", compressed size: " << compressed_size;
     }
 
     // return compressed and uncompressed size
     size_t pb_size = get_batch_size(*output_batch);
     if (allocated_buf == nullptr) {
-        *uncompressed_size = pb_size - mutable_tuple_data->size() + size;
+        *uncompressed_size = pb_size - mutable_tuple_data->size() + 
tuple_byte_size;
         *compressed_size = pb_size;
         if (pb_size > std::numeric_limits<int32_t>::max()) {
             // the protobuf has a hard limit of 2GB for serialized data.
@@ -302,7 +316,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* 
uncompressed_size,
                                 pb_size));
         }
     } else {
-        *uncompressed_size = pb_size + size;
+        *uncompressed_size = pb_size + tuple_byte_size;
         *compressed_size = pb_size + mutable_tuple_data->size();
     }
     return Status::OK();
diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc
index 15a1516293..16e3649bd1 100644
--- a/be/src/runtime/tmp_file_mgr.cc
+++ b/be/src/runtime/tmp_file_mgr.cc
@@ -21,6 +21,7 @@
 #include <boost/uuid/random_generator.hpp>
 #include <boost/uuid/uuid_io.hpp>
 #include <filesystem>
+#include <random>
 
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
@@ -160,6 +161,14 @@ string TmpFileMgr::get_tmp_dir_path(DeviceId device_id) 
const {
     return _tmp_dirs[device_id].path();
 }
 
+std::string TmpFileMgr::get_tmp_dir_path() {
+    std::vector<DeviceId> devices = active_tmp_devices();
+    std::random_device rd;
+    std::mt19937 g(rd());
+    std::shuffle(devices.begin(), devices.end(), g);
+    return get_tmp_dir_path(devices.front());
+}
+
 void TmpFileMgr::blacklist_device(DeviceId device_id) {
     DCHECK(_initialized);
     DCHECK(device_id >= 0 && device_id < _tmp_dirs.size());
diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h
index f9d2799ce0..4db957294f 100644
--- a/be/src/runtime/tmp_file_mgr.h
+++ b/be/src/runtime/tmp_file_mgr.h
@@ -126,6 +126,9 @@ public:
     // Return the scratch directory path for the device.
     std::string get_tmp_dir_path(DeviceId device_id) const;
 
+    // Return a random scratch directory path from the devices.
+    std::string get_tmp_dir_path();
+
     // Total number of devices with tmp directories that are active. There is 
one tmp
     // directory per device.
     int num_active_tmp_devices();
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 01898914b5..0cd79d4fbe 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -42,8 +42,7 @@ BRpcService::~BRpcService() {}
 
 Status BRpcService::start(int port) {
     // Add service
-    _server->AddService(new PInternalServiceImpl<PBackendService>(_exec_env),
-                        brpc::SERVER_OWNS_SERVICE);
+    _server->AddService(new PInternalServiceImpl(_exec_env), 
brpc::SERVER_OWNS_SERVICE);
     // start service
     brpc::ServerOptions options;
     if (config::brpc_num_threads != -1) {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 9fa45b9a66..d9e5756210 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -49,25 +49,22 @@ static void thread_context_deleter(void* d) {
     delete static_cast<ThreadContext*>(d);
 }
 
-template <typename T>
-PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
+PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), 
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
     REGISTER_HOOK_METRIC(add_batch_task_queue_size,
                          [this]() { return 
_tablet_worker_pool.get_queue_size(); });
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
 }
 
-template <typename T>
-PInternalServiceImpl<T>::~PInternalServiceImpl() {
+PInternalServiceImpl::~PInternalServiceImpl() {
     DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
     CHECK_EQ(0, bthread_key_delete(btls_key));
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* 
cntl_base,
-                                            const PTransmitDataParams* request,
-                                            PTransmitDataResult* response,
-                                            google::protobuf::Closure* done) {
+void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* 
cntl_base,
+                                         const PTransmitDataParams* request,
+                                         PTransmitDataResult* response,
+                                         google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
              << " node=" << request->node_id();
@@ -89,11 +86,10 @@ void 
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
     }
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController* 
controller,
-                                                 const 
PTabletWriterOpenRequest* request,
-                                                 PTabletWriterOpenResult* 
response,
-                                                 google::protobuf::Closure* 
done) {
+void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* 
controller,
+                                              const PTabletWriterOpenRequest* 
request,
+                                              PTabletWriterOpenResult* 
response,
+                                              google::protobuf::Closure* done) 
{
     SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
              << ", txn_id=" << request->txn_id();
@@ -107,11 +103,10 @@ void 
PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
     st.to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController* 
cntl_base,
-                                                 const 
PExecPlanFragmentRequest* request,
-                                                 PExecPlanFragmentResult* 
response,
-                                                 google::protobuf::Closure* 
done) {
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+                                              const PExecPlanFragmentRequest* 
request,
+                                              PExecPlanFragmentResult* 
response,
+                                              google::protobuf::Closure* done) 
{
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
@@ -123,11 +118,10 @@ void 
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
     st.to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcController*
 cntl_base,
-                                                      const 
PTabletWriterAddBlockRequest* request,
-                                                      
PTabletWriterAddBlockResult* response,
-                                                      
google::protobuf::Closure* done) {
+void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
+                                                   const 
PTabletWriterAddBlockRequest* request,
+                                                   
PTabletWriterAddBlockResult* response,
+                                                   google::protobuf::Closure* 
done) {
     VLOG_RPC << "tablet writer add block, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
              << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
@@ -154,11 +148,10 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcContr
     });
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController*
 cntl_base,
-                                                      const 
PTabletWriterAddBatchRequest* request,
-                                                      
PTabletWriterAddBatchResult* response,
-                                                      
google::protobuf::Closure* done) {
+void 
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* 
cntl_base,
+                                                   const 
PTabletWriterAddBatchRequest* request,
+                                                   
PTabletWriterAddBatchResult* response,
+                                                   google::protobuf::Closure* 
done) {
     SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
@@ -189,11 +182,10 @@ void 
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
     });
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
-                                                   const 
PTabletWriterCancelRequest* request,
-                                                   PTabletWriterCancelResult* 
response,
-                                                   google::protobuf::Closure* 
done) {
+void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
+                                                const 
PTabletWriterCancelRequest* request,
+                                                PTabletWriterCancelResult* 
response,
+                                                google::protobuf::Closure* 
done) {
     SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" 
<< request->index_id()
              << ", sender_id=" << request->sender_id();
@@ -206,8 +198,7 @@ void 
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
     }
 }
 
-template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string& 
ser_request, bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment(const std::string& 
ser_request, bool compact) {
     TExecPlanFragmentParams t_request;
     {
         const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -217,11 +208,10 @@ Status PInternalServiceImpl<T>::_exec_plan_fragment(const 
std::string& ser_reque
     return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
-                                                   const 
PCancelPlanFragmentRequest* request,
-                                                   PCancelPlanFragmentResult* 
result,
-                                                   google::protobuf::Closure* 
done) {
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
+                                                const 
PCancelPlanFragmentRequest* request,
+                                                PCancelPlanFragmentResult* 
result,
+                                                google::protobuf::Closure* 
done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId tid;
@@ -243,20 +233,18 @@ void 
PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
     st.to_protobuf(result->mutable_status());
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* 
cntl_base,
-                                         const PFetchDataRequest* request, 
PFetchDataResult* result,
-                                         google::protobuf::Closure* done) {
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
+                                      const PFetchDataRequest* request, 
PFetchDataResult* result,
+                                      google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* 
controller,
-                                       const PProxyRequest* request, 
PProxyResult* response,
-                                       google::protobuf::Closure* done) {
+void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
+                                    const PProxyRequest* request, 
PProxyResult* response,
+                                    google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     // PProxyRequest is defined in gensrc/proto/internal_service.proto
@@ -315,41 +303,34 @@ void 
PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
     Status::OK().to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* 
controller,
-                                           const PUpdateCacheRequest* request,
-                                           PCacheResponse* response,
-                                           google::protobuf::Closure* done) {
+void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
+                                        const PUpdateCacheRequest* request,
+                                        PCacheResponse* response, 
google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->update(request, response);
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* 
controller,
-                                          const PFetchCacheRequest* request,
-                                          PFetchCacheResult* result,
-                                          google::protobuf::Closure* done) {
+void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
+                                       const PFetchCacheRequest* request, 
PFetchCacheResult* result,
+                                       google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->fetch(request, result);
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* 
controller,
-                                          const PClearCacheRequest* request,
-                                          PCacheResponse* response,
-                                          google::protobuf::Closure* done) {
+void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
+                                       const PClearCacheRequest* request, 
PCacheResponse* response,
+                                       google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->clear(request, response);
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* 
controller,
-                                           const ::doris::PMergeFilterRequest* 
request,
-                                           ::doris::PMergeFilterResponse* 
response,
-                                           ::google::protobuf::Closure* done) {
+void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PMergeFilterRequest* 
request,
+                                        ::doris::PMergeFilterResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto buf = 
static_cast<brpc::Controller*>(controller)->request_attachment();
@@ -360,11 +341,10 @@ void 
PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
     st.to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* 
controller,
-                                           const 
::doris::PPublishFilterRequest* request,
-                                           ::doris::PPublishFilterResponse* 
response,
-                                           ::google::protobuf::Closure* done) {
+void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PPublishFilterRequest* 
request,
+                                        ::doris::PPublishFilterResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
@@ -378,10 +358,9 @@ void 
PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
     st.to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* 
controller,
-                                        const PSendDataRequest* request, 
PSendDataResult* response,
-                                        google::protobuf::Closure* done) {
+void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
+                                     const PSendDataRequest* request, 
PSendDataResult* response,
+                                     google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
@@ -402,10 +381,9 @@ void 
PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* control
     }
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* 
controller,
-                                     const PCommitRequest* request, 
PCommitResult* response,
-                                     google::protobuf::Closure* done) {
+void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
+                                  const PCommitRequest* request, 
PCommitResult* response,
+                                  google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
@@ -421,10 +399,9 @@ void 
PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller
     }
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* 
controller,
-                                       const PRollbackRequest* request, 
PRollbackResult* response,
-                                       google::protobuf::Closure* done) {
+void PInternalServiceImpl::rollback(google::protobuf::RpcController* 
controller,
+                                    const PRollbackRequest* request, 
PRollbackResult* response,
+                                    google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
@@ -440,11 +417,10 @@ void 
PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controll
     }
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController* 
cntl_base,
-                                                 const PConstantExprRequest* 
request,
-                                                 PConstantExprResult* response,
-                                                 google::protobuf::Closure* 
done) {
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
cntl_base,
+                                              const PConstantExprRequest* 
request,
+                                              PConstantExprResult* response,
+                                              google::protobuf::Closure* done) 
{
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -462,9 +438,8 @@ void 
PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
     st.to_protobuf(response->mutable_status());
 }
 
-template <typename T>
-Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& 
ser_request,
-                                                    PConstantExprResult* 
response) {
+Status PInternalServiceImpl::_fold_constant_expr(const std::string& 
ser_request,
+                                                 PConstantExprResult* 
response) {
     TFoldConstantParams t_request;
     {
         const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -477,11 +452,10 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const 
std::string& ser_reque
     return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* 
cntl_base,
-                                             const PTransmitDataParams* 
request,
-                                             PTransmitDataResult* response,
-                                             google::protobuf::Closure* done) {
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
cntl_base,
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << 
print_id(request->finst_id())
              << " node=" << request->node_id();
@@ -503,11 +477,10 @@ void 
PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
     }
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController* 
controller,
-                                                const PCheckRPCChannelRequest* 
request,
-                                                PCheckRPCChannelResponse* 
response,
-                                                google::protobuf::Closure* 
done) {
+void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* 
controller,
+                                             const PCheckRPCChannelRequest* 
request,
+                                             PCheckRPCChannelResponse* 
response,
+                                             google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
@@ -531,11 +504,10 @@ void 
PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
     }
 }
 
-template <typename T>
-void 
PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController* 
controller,
-                                                const PResetRPCChannelRequest* 
request,
-                                                PResetRPCChannelResponse* 
response,
-                                                google::protobuf::Closure* 
done) {
+void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* 
controller,
+                                             const PResetRPCChannelRequest* 
request,
+                                             PResetRPCChannelResponse* 
response,
+                                             google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
@@ -566,11 +538,10 @@ void 
PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
     }
 }
 
-template <typename T>
-void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* 
cntl_base,
-                                         const PHandShakeRequest* request,
-                                         PHandShakeResponse* response,
-                                         google::protobuf::Closure* done) {
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
cntl_base,
+                                      const PHandShakeRequest* request,
+                                      PHandShakeResponse* response,
+                                      google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     if (request->has_hello()) {
@@ -579,6 +550,4 @@ void 
PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
     response->mutable_status()->set_status_code(0);
 }
 
-template class PInternalServiceImpl<PBackendService>;
-
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index ce4913701d..18a1667f56 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -30,8 +30,7 @@ namespace doris {
 
 class ExecEnv;
 
-template <typename T>
-class PInternalServiceImpl : public T {
+class PInternalServiceImpl : public PBackendService {
 public:
     PInternalServiceImpl(ExecEnv* exec_env);
     virtual ~PInternalServiceImpl();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to