gavinchou commented on code in PR #56116:
URL: https://github.com/apache/doris/pull/56116#discussion_r2371677282
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -120,6 +135,104 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}
+
+void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
+ [[maybe_unused]],
+ const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse*
response,
+ google::protobuf::Closure*
done) {
+ brpc::ClosureGuard closure_guard(done);
+ g_file_cache_get_by_peer_num << 1;
+ if (!config::enable_file_cache) {
+ LOG_WARNING("try to access file cache data, but file cache not
enabled");
+ return;
+ }
+ int64_t begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ const auto type = request->type();
+ const auto& path = request->path();
+ response->mutable_status()->set_status_code(TStatusCode::OK);
+ if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
+ // Read specific range [file_offset, file_offset+file_size) across
cached blocks
+ auto datas =
io::FileCacheFactory::instance()->get_cache_data_by_path(path);
+ for (auto& cb : datas) {
+ *(response->add_datas()) = std::move(cb);
+ }
+ } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
+ // Multiple specific blocks
+ auto hash = io::BlockFileCache::hash(path);
+ auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ g_file_cache_get_by_peer_failed_num << 1;
+ response->mutable_status()->add_error_msgs("can't get file cache
instance");
+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ return;
+ }
+ io::CacheContext ctx {};
+ // ensure a valid stats pointer is provided to cache layer
+ io::ReadStatistics local_stats;
+ ctx.stats = &local_stats;
+ for (const auto& cb_req : request->cache_req()) {
+ size_t offset = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_offset()));
+ size_t size = static_cast<size_t>(std::max<int64_t>(0,
cb_req.block_size()));
+ auto holder = cache->get_or_set(hash, offset, size, ctx);
+ for (auto& fb : holder.file_blocks) {
+ g_file_cache_get_by_peer_blocks_num << 1;
+ doris::CacheBlock* out = response->add_datas();
+ out->set_block_offset(static_cast<int64_t>(fb->offset()));
+ out->set_block_size(static_cast<int64_t>(fb->range().size()));
+ std::string data;
+ data.resize(fb->range().size());
+ // Offload the file block read to a dedicated OS thread to
avoid bthread IO
+ Status read_st = Status::OK();
+ // due to file_reader.cpp:33] Check failed: bthread_self() == 0
+ int64_t begin_read_file_ts =
+ std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ auto task = [&] {
+ // Current thread not exist ThreadContext, usually after
the thread is started, using SCOPED_ATTACH_TASK macro to create a ThreadContext
and bind a Task.
+
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
+ Slice slice(data.data(), data.size());
+ read_st = fb->read(slice, /*read_offset=*/0);
+ };
+ AsyncIO::run_task(task, io::FileSystemType::LOCAL);
+ int64_t end_read_file_ts =
+ std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_read_cache_file_latency
+ << (end_read_file_ts - begin_read_file_ts);
+ if (read_st.ok()) {
+ out->set_data(std::move(data));
+ } else {
+ g_file_cache_get_by_peer_failed_num << 1;
+ LOG(WARNING) << "read cache block failed: " << read_st;
+ response->mutable_status()->add_error_msgs("read cache
file error");
+
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
+ return;
+ }
+ }
+ }
+ }
+ DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
+ int st_us = dp->param<int>("sleep", 1000);
+
LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep",
st_us);
+ // sleep us
+ bthread_usleep(st_us);
+ });
+
+ int64_t end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
Review Comment:
rpc latency has been recorded by brpc framework, this bvar is redundant.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]