This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3899c08036 [optimize](compile) remove unused template param from load channel (#18980) 3899c08036 is described below commit 3899c0803661c41cdf1f9159644f2a8b3c2a1b10 Author: yiguolei <676222...@qq.com> AuthorDate: Mon Apr 24 23:36:47 2023 +0800 [optimize](compile) remove unused template param from load channel (#18980) * [optimize](compile) remove unused template param from load channel --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/runtime/load_channel.cpp | 54 ++++++++++++++++++++++++++++++ be/src/runtime/load_channel.h | 64 +++-------------------------------- be/src/runtime/load_channel_mgr.cpp | 57 +++++++++++++++++++++++++++++++ be/src/runtime/load_channel_mgr.h | 67 +++---------------------------------- be/src/runtime/tablets_channel.cpp | 9 ++--- be/src/runtime/tablets_channel.h | 5 +-- 6 files changed, 124 insertions(+), 132 deletions(-) diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index b29f760486..507a214cc7 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -104,6 +104,60 @@ Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe return Status::OK(); } +Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { + int64_t index_id = request.index_id(); + // 1. get tablets channel + std::shared_ptr<TabletsChannel> channel; + bool is_finished; + Status st = _get_tablets_channel(channel, is_finished, index_id); + if (!st.ok() || is_finished) { + return st; + } + + // 2. add block to tablets channel + if (request.has_block()) { + RETURN_IF_ERROR(channel->add_batch(request, response)); + _add_batch_number_counter->update(1); + } + + // 3. handle eos + if (request.has_eos() && request.eos()) { + st = _handle_eos(channel, request, response); + _report_profile(response); + if (!st.ok()) { + return st; + } + } else if (_add_batch_number_counter->value() % 10 == 1) { + _report_profile(response); + } + _last_updated_time.store(time(nullptr)); + return st; +} + +void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { + COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); + // TabletSink and LoadChannel in BE are M: N relationship, + // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, + // so usually all LoadChannel runtime profiles are saved on each TabletSink, + // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, + // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, + // and ensures to update the latest LoadChannel profile according to the timestamp. + _self_profile->set_timestamp(_last_updated_time); + + TRuntimeProfileTree tprofile; + _profile->to_thrift(&tprofile); + ThriftSerializer ser(false, 4096); + uint8_t* buf = nullptr; + uint32_t len = 0; + auto st = ser.serialize(&tprofile, &len, &buf); + if (st.ok()) { + response->set_load_channel_profile(std::string((const char*)buf, len)); + } else { + LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; + } +} + bool LoadChannel::is_finished() { if (!_opened) { return false; diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index dc23defc4a..7f54480743 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> #include <stdint.h> #include <time.h> @@ -57,8 +58,8 @@ public: Status open(const PTabletWriterOpenRequest& request); // this batch must belong to a index in one transaction - template <typename TabletWriterAddRequest, typename TabletWriterAddResult> - Status add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response); + Status add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response); // return true if this load channel has been opened and all tablets channels are closed then. bool is_finished(); @@ -138,9 +139,8 @@ protected: } void _init_profile(); - template <typename TabletWriterAddResult> // thread safety - void _report_profile(TabletWriterAddResult* response); + void _report_profile(PTabletWriterAddBlockResult* response); private: UniqueId _load_id; @@ -177,62 +177,6 @@ private: int64_t _backend_id; }; -template <typename TabletWriterAddRequest, typename TabletWriterAddResult> -Status LoadChannel::add_batch(const TabletWriterAddRequest& request, - TabletWriterAddResult* response) { - int64_t index_id = request.index_id(); - // 1. get tablets channel - std::shared_ptr<TabletsChannel> channel; - bool is_finished; - Status st = _get_tablets_channel(channel, is_finished, index_id); - if (!st.ok() || is_finished) { - return st; - } - - // 2. add block to tablets channel - if (request.has_block()) { - RETURN_IF_ERROR(channel->add_batch(request, response)); - _add_batch_number_counter->update(1); - } - - // 3. handle eos - if (request.has_eos() && request.eos()) { - st = _handle_eos(channel, request, response); - _report_profile<TabletWriterAddResult>(response); - if (!st.ok()) { - return st; - } - } else if (_add_batch_number_counter->value() % 10 == 1) { - _report_profile<TabletWriterAddResult>(response); - } - _last_updated_time.store(time(nullptr)); - return st; -} - -template <typename TabletWriterAddResult> -void LoadChannel::_report_profile(TabletWriterAddResult* response) { - COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); - // TabletSink and LoadChannel in BE are M: N relationship, - // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, - // so usually all LoadChannel runtime profiles are saved on each TabletSink, - // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, - // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, - // and ensures to update the latest LoadChannel profile according to the timestamp. - _self_profile->set_timestamp(_last_updated_time); - - TRuntimeProfileTree tprofile; - _profile->to_thrift(&tprofile); - ThriftSerializer ser(false, 4096); - uint8_t* buf = nullptr; - uint32_t len = 0; - auto st = ser.serialize(&tprofile, &len, &buf); - if (st.ok()) { - response->set_load_channel_profile(std::string((const char*)buf, len)); - } else { - LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; - } -} - inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time()) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 068ce2261c..597423598c 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -136,6 +136,63 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { static void dummy_deleter(const CacheKey& key, void* value) {} +Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, + const UniqueId& load_id, + const PTabletWriterAddBlockRequest& request) { + is_eof = false; + std::lock_guard<std::mutex> l(_lock); + auto it = _load_channels.find(load_id); + if (it == _load_channels.end()) { + auto handle = _last_success_channel->lookup(load_id.to_string()); + // success only when eos be true + if (handle != nullptr) { + _last_success_channel->release(handle); + if (request.has_eos() && request.eos()) { + is_eof = true; + return Status::OK(); + } + } + return Status::InternalError("fail to add batch in load channel. unknown load_id={}", + load_id.to_string()); + } + channel = it->second; + return Status::OK(); +} + +Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { + UniqueId load_id(request.id()); + // 1. get load channel + std::shared_ptr<LoadChannel> channel; + bool is_eof; + auto status = _get_load_channel(channel, is_eof, load_id, request); + if (!status.ok() || is_eof) { + return status; + } + + if (!channel->is_high_priority()) { + // 2. check if mem consumption exceed limit + // If this is a high priority load task, do not handle this. + // because this may block for a while, which may lead to rpc timeout. + _handle_mem_exceed_limit(); + } + + // 3. add batch to load channel + // batch may not exist in request(eg: eos request without batch), + // this case will be handled in load channel's add batch method. + Status st = channel->add_batch(request, response); + if (UNLIKELY(!st.ok())) { + channel->cancel(); + return st; + } + + // 4. handle finish + if (channel->is_finished()) { + _finish_load_channel(load_id); + } + return Status::OK(); +} + void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 69112724b5..cad611b75b 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> #include <stdint.h> #include <condition_variable> @@ -24,7 +25,6 @@ #include <mutex> #include <unordered_map> #include <utility> - // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" @@ -54,8 +54,8 @@ public: // open a new load channel if not exist Status open(const PTabletWriterOpenRequest& request); - template <typename TabletWriterAddRequest, typename TabletWriterAddResult> - Status add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response); + Status add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response); // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); @@ -67,9 +67,8 @@ public: MemTrackerLimiter* mem_tracker() { return _mem_tracker.get(); } private: - template <typename Request> Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, - const UniqueId& load_id, const Request& request); + const UniqueId& load_id, const PTabletWriterAddBlockRequest& request); void _finish_load_channel(UniqueId load_id); // check if the total load mem consumption exceeds limit. @@ -113,62 +112,4 @@ protected: Status _start_load_channels_clean(); }; -template <typename Request> -Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, - const UniqueId& load_id, const Request& request) { - is_eof = false; - std::lock_guard<std::mutex> l(_lock); - auto it = _load_channels.find(load_id); - if (it == _load_channels.end()) { - auto handle = _last_success_channel->lookup(load_id.to_string()); - // success only when eos be true - if (handle != nullptr) { - _last_success_channel->release(handle); - if (request.has_eos() && request.eos()) { - is_eof = true; - return Status::OK(); - } - } - return Status::InternalError("fail to add batch in load channel. unknown load_id={}", - load_id.to_string()); - } - channel = it->second; - return Status::OK(); -} - -template <typename TabletWriterAddRequest, typename TabletWriterAddResult> -Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, - TabletWriterAddResult* response) { - UniqueId load_id(request.id()); - // 1. get load channel - std::shared_ptr<LoadChannel> channel; - bool is_eof; - auto status = _get_load_channel(channel, is_eof, load_id, request); - if (!status.ok() || is_eof) { - return status; - } - - if (!channel->is_high_priority()) { - // 2. check if mem consumption exceed limit - // If this is a high priority load task, do not handle this. - // because this may block for a while, which may lead to rpc timeout. - _handle_mem_exceed_limit(); - } - - // 3. add batch to load channel - // batch may not exist in request(eg: eos request without batch), - // this case will be handled in load channel's add batch method. - Status st = channel->add_batch(request, response); - if (UNLIKELY(!st.ok())) { - channel->cancel(); - return st; - } - - // 4. handle finish - if (channel->is_finished()) { - _finish_load_channel(load_id); - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 5571ab9a87..36737b67e4 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -345,9 +345,8 @@ std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) { return os; } -template <typename TabletWriterAddRequest, typename TabletWriterAddResult> -Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, - TabletWriterAddResult* response) { +Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response) { int64_t cur_seq = 0; _add_batch_number_counter->update(1); @@ -501,8 +500,4 @@ bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) { std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock); return _broken_tablets.find(tablet_id) != _broken_tablets.end(); } - -template Status -TabletsChannel::add_batch<PTabletWriterAddBlockRequest, PTabletWriterAddBlockResult>( - PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*); } // namespace doris diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 66c512f3d5..3a897060a6 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -17,6 +17,7 @@ #pragma once +#include <gen_cpp/internal_service.pb.h> #include <glog/logging.h> #include <atomic> @@ -89,8 +90,8 @@ public: Status open(const PTabletWriterOpenRequest& request); // no-op when this channel has been closed or cancelled - template <typename TabletWriterAddRequest, typename TabletWriterAddResult> - Status add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response); + Status add_batch(const PTabletWriterAddBlockRequest& request, + PTabletWriterAddBlockResult* response); // Mark sender with 'sender_id' as closed. // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org