This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3899c08036 [optimize](compile) remove unused template param from load 
channel (#18980)
3899c08036 is described below

commit 3899c0803661c41cdf1f9159644f2a8b3c2a1b10
Author: yiguolei <676222...@qq.com>
AuthorDate: Mon Apr 24 23:36:47 2023 +0800

    [optimize](compile) remove unused template param from load channel (#18980)
    
    * [optimize](compile) remove unused template param from load channel
    
    
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/runtime/load_channel.cpp     | 54 ++++++++++++++++++++++++++++++
 be/src/runtime/load_channel.h       | 64 +++--------------------------------
 be/src/runtime/load_channel_mgr.cpp | 57 +++++++++++++++++++++++++++++++
 be/src/runtime/load_channel_mgr.h   | 67 +++----------------------------------
 be/src/runtime/tablets_channel.cpp  |  9 ++---
 be/src/runtime/tablets_channel.h    |  5 +--
 6 files changed, 124 insertions(+), 132 deletions(-)

diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index b29f760486..507a214cc7 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -104,6 +104,60 @@ Status 
LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe
     return Status::OK();
 }
 
+Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
+                              PTabletWriterAddBlockResult* response) {
+    int64_t index_id = request.index_id();
+    // 1. get tablets channel
+    std::shared_ptr<TabletsChannel> channel;
+    bool is_finished;
+    Status st = _get_tablets_channel(channel, is_finished, index_id);
+    if (!st.ok() || is_finished) {
+        return st;
+    }
+
+    // 2. add block to tablets channel
+    if (request.has_block()) {
+        RETURN_IF_ERROR(channel->add_batch(request, response));
+        _add_batch_number_counter->update(1);
+    }
+
+    // 3. handle eos
+    if (request.has_eos() && request.eos()) {
+        st = _handle_eos(channel, request, response);
+        _report_profile(response);
+        if (!st.ok()) {
+            return st;
+        }
+    } else if (_add_batch_number_counter->value() % 10 == 1) {
+        _report_profile(response);
+    }
+    _last_updated_time.store(time(nullptr));
+    return st;
+}
+
+void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
+    COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption());
+    // TabletSink and LoadChannel in BE are M: N relationship,
+    // Every once in a while LoadChannel will randomly return its own runtime 
profile to a TabletSink,
+    // so usually all LoadChannel runtime profiles are saved on each 
TabletSink,
+    // and the timeliness of the same LoadChannel profile saved on different 
TabletSinks is different,
+    // and each TabletSink will periodically send fe reports all the 
LoadChannel profiles saved by itself,
+    // and ensures to update the latest LoadChannel profile according to the 
timestamp.
+    _self_profile->set_timestamp(_last_updated_time);
+
+    TRuntimeProfileTree tprofile;
+    _profile->to_thrift(&tprofile);
+    ThriftSerializer ser(false, 4096);
+    uint8_t* buf = nullptr;
+    uint32_t len = 0;
+    auto st = ser.serialize(&tprofile, &len, &buf);
+    if (st.ok()) {
+        response->set_load_channel_profile(std::string((const char*)buf, len));
+    } else {
+        LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, 
errmsg=" << st;
+    }
+}
+
 bool LoadChannel::is_finished() {
     if (!_opened) {
         return false;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index dc23defc4a..7f54480743 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
 #include <stdint.h>
 #include <time.h>
 
@@ -57,8 +58,8 @@ public:
     Status open(const PTabletWriterOpenRequest& request);
 
     // this batch must belong to a index in one transaction
-    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-    Status add_batch(const TabletWriterAddRequest& request, 
TabletWriterAddResult* response);
+    Status add_batch(const PTabletWriterAddBlockRequest& request,
+                     PTabletWriterAddBlockResult* response);
 
     // return true if this load channel has been opened and all tablets 
channels are closed then.
     bool is_finished();
@@ -138,9 +139,8 @@ protected:
     }
 
     void _init_profile();
-    template <typename TabletWriterAddResult>
     // thread safety
-    void _report_profile(TabletWriterAddResult* response);
+    void _report_profile(PTabletWriterAddBlockResult* response);
 
 private:
     UniqueId _load_id;
@@ -177,62 +177,6 @@ private:
     int64_t _backend_id;
 };
 
-template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
-                              TabletWriterAddResult* response) {
-    int64_t index_id = request.index_id();
-    // 1. get tablets channel
-    std::shared_ptr<TabletsChannel> channel;
-    bool is_finished;
-    Status st = _get_tablets_channel(channel, is_finished, index_id);
-    if (!st.ok() || is_finished) {
-        return st;
-    }
-
-    // 2. add block to tablets channel
-    if (request.has_block()) {
-        RETURN_IF_ERROR(channel->add_batch(request, response));
-        _add_batch_number_counter->update(1);
-    }
-
-    // 3. handle eos
-    if (request.has_eos() && request.eos()) {
-        st = _handle_eos(channel, request, response);
-        _report_profile<TabletWriterAddResult>(response);
-        if (!st.ok()) {
-            return st;
-        }
-    } else if (_add_batch_number_counter->value() % 10 == 1) {
-        _report_profile<TabletWriterAddResult>(response);
-    }
-    _last_updated_time.store(time(nullptr));
-    return st;
-}
-
-template <typename TabletWriterAddResult>
-void LoadChannel::_report_profile(TabletWriterAddResult* response) {
-    COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption());
-    // TabletSink and LoadChannel in BE are M: N relationship,
-    // Every once in a while LoadChannel will randomly return its own runtime 
profile to a TabletSink,
-    // so usually all LoadChannel runtime profiles are saved on each 
TabletSink,
-    // and the timeliness of the same LoadChannel profile saved on different 
TabletSinks is different,
-    // and each TabletSink will periodically send fe reports all the 
LoadChannel profiles saved by itself,
-    // and ensures to update the latest LoadChannel profile according to the 
timestamp.
-    _self_profile->set_timestamp(_last_updated_time);
-
-    TRuntimeProfileTree tprofile;
-    _profile->to_thrift(&tprofile);
-    ThriftSerializer ser(false, 4096);
-    uint8_t* buf = nullptr;
-    uint32_t len = 0;
-    auto st = ser.serialize(&tprofile, &len, &buf);
-    if (st.ok()) {
-        response->set_load_channel_profile(std::string((const char*)buf, len));
-    } else {
-        LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, 
errmsg=" << st;
-    }
-}
-
 inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) {
     os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << 
load_channel.mem_consumption()
        << ", last_update_time=" << 
static_cast<uint64_t>(load_channel.last_updated_time())
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 068ce2261c..597423598c 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -136,6 +136,63 @@ Status LoadChannelMgr::open(const 
PTabletWriterOpenRequest& params) {
 
 static void dummy_deleter(const CacheKey& key, void* value) {}
 
+Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& 
channel, bool& is_eof,
+                                         const UniqueId& load_id,
+                                         const PTabletWriterAddBlockRequest& 
request) {
+    is_eof = false;
+    std::lock_guard<std::mutex> l(_lock);
+    auto it = _load_channels.find(load_id);
+    if (it == _load_channels.end()) {
+        auto handle = _last_success_channel->lookup(load_id.to_string());
+        // success only when eos be true
+        if (handle != nullptr) {
+            _last_success_channel->release(handle);
+            if (request.has_eos() && request.eos()) {
+                is_eof = true;
+                return Status::OK();
+            }
+        }
+        return Status::InternalError("fail to add batch in load channel. 
unknown load_id={}",
+                                     load_id.to_string());
+    }
+    channel = it->second;
+    return Status::OK();
+}
+
+Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
+                                 PTabletWriterAddBlockResult* response) {
+    UniqueId load_id(request.id());
+    // 1. get load channel
+    std::shared_ptr<LoadChannel> channel;
+    bool is_eof;
+    auto status = _get_load_channel(channel, is_eof, load_id, request);
+    if (!status.ok() || is_eof) {
+        return status;
+    }
+
+    if (!channel->is_high_priority()) {
+        // 2. check if mem consumption exceed limit
+        // If this is a high priority load task, do not handle this.
+        // because this may block for a while, which may lead to rpc timeout.
+        _handle_mem_exceed_limit();
+    }
+
+    // 3. add batch to load channel
+    // batch may not exist in request(eg: eos request without batch),
+    // this case will be handled in load channel's add batch method.
+    Status st = channel->add_batch(request, response);
+    if (UNLIKELY(!st.ok())) {
+        channel->cancel();
+        return st;
+    }
+
+    // 4. handle finish
+    if (channel->is_finished()) {
+        _finish_load_channel(load_id);
+    }
+    return Status::OK();
+}
+
 void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) {
     VLOG_NOTICE << "removing load channel " << load_id << " because it's 
finished";
     {
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 69112724b5..cad611b75b 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
 #include <stdint.h>
 
 #include <condition_variable>
@@ -24,7 +25,6 @@
 #include <mutex>
 #include <unordered_map>
 #include <utility>
-
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
@@ -54,8 +54,8 @@ public:
     // open a new load channel if not exist
     Status open(const PTabletWriterOpenRequest& request);
 
-    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-    Status add_batch(const TabletWriterAddRequest& request, 
TabletWriterAddResult* response);
+    Status add_batch(const PTabletWriterAddBlockRequest& request,
+                     PTabletWriterAddBlockResult* response);
 
     // cancel all tablet stream for 'load_id' load
     Status cancel(const PTabletWriterCancelRequest& request);
@@ -67,9 +67,8 @@ public:
     MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); }
 
 private:
-    template <typename Request>
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& 
is_eof,
-                             const UniqueId& load_id, const Request& request);
+                             const UniqueId& load_id, const 
PTabletWriterAddBlockRequest& request);
 
     void _finish_load_channel(UniqueId load_id);
     // check if the total load mem consumption exceeds limit.
@@ -113,62 +112,4 @@ protected:
     Status _start_load_channels_clean();
 };
 
-template <typename Request>
-Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& 
channel, bool& is_eof,
-                                         const UniqueId& load_id, const 
Request& request) {
-    is_eof = false;
-    std::lock_guard<std::mutex> l(_lock);
-    auto it = _load_channels.find(load_id);
-    if (it == _load_channels.end()) {
-        auto handle = _last_success_channel->lookup(load_id.to_string());
-        // success only when eos be true
-        if (handle != nullptr) {
-            _last_success_channel->release(handle);
-            if (request.has_eos() && request.eos()) {
-                is_eof = true;
-                return Status::OK();
-            }
-        }
-        return Status::InternalError("fail to add batch in load channel. 
unknown load_id={}",
-                                     load_id.to_string());
-    }
-    channel = it->second;
-    return Status::OK();
-}
-
-template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request,
-                                 TabletWriterAddResult* response) {
-    UniqueId load_id(request.id());
-    // 1. get load channel
-    std::shared_ptr<LoadChannel> channel;
-    bool is_eof;
-    auto status = _get_load_channel(channel, is_eof, load_id, request);
-    if (!status.ok() || is_eof) {
-        return status;
-    }
-
-    if (!channel->is_high_priority()) {
-        // 2. check if mem consumption exceed limit
-        // If this is a high priority load task, do not handle this.
-        // because this may block for a while, which may lead to rpc timeout.
-        _handle_mem_exceed_limit();
-    }
-
-    // 3. add batch to load channel
-    // batch may not exist in request(eg: eos request without batch),
-    // this case will be handled in load channel's add batch method.
-    Status st = channel->add_batch(request, response);
-    if (UNLIKELY(!st.ok())) {
-        channel->cancel();
-        return st;
-    }
-
-    // 4. handle finish
-    if (channel->is_finished()) {
-        _finish_load_channel(load_id);
-    }
-    return Status::OK();
-}
-
 } // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 5571ab9a87..36737b67e4 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -345,9 +345,8 @@ std::ostream& operator<<(std::ostream& os, const 
TabletsChannelKey& key) {
     return os;
 }
 
-template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
-                                 TabletWriterAddResult* response) {
+Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
+                                 PTabletWriterAddBlockResult* response) {
     int64_t cur_seq = 0;
     _add_batch_number_counter->update(1);
 
@@ -501,8 +500,4 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
     std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
     return _broken_tablets.find(tablet_id) != _broken_tablets.end();
 }
-
-template Status
-TabletsChannel::add_batch<PTabletWriterAddBlockRequest, 
PTabletWriterAddBlockResult>(
-        PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
 } // namespace doris
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 66c512f3d5..3a897060a6 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
 
 #include <atomic>
@@ -89,8 +90,8 @@ public:
     Status open(const PTabletWriterOpenRequest& request);
 
     // no-op when this channel has been closed or cancelled
-    template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
-    Status add_batch(const TabletWriterAddRequest& request, 
TabletWriterAddResult* response);
+    Status add_batch(const PTabletWriterAddBlockRequest& request,
+                     PTabletWriterAddBlockResult* response);
 
     // Mark sender with 'sender_id' as closed.
     // If all senders are closed, close this channel, set '*finished' to true, 
update 'tablet_vec'


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to