gavinchou commented on code in PR #56384:
URL: https://github.com/apache/doris/pull/56384#discussion_r2463938694
##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -131,6 +150,124 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+
+ auto [host, port] = get_peer_connection_info(file_path);
Review Comment:
什么情况下 找不到peer?
这个些peer是另外一个cg的还是 同个cg的?
##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -131,6 +150,124 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+
+ auto [host, port] = get_peer_connection_info(file_path);
+ VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
+ << ", file_path=" << file_path;
+
+ if (host.empty() || port == 0) {
+ LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is
empty"
+ << ", host=" << host << ", port=" << port
+ << ", file_path=" << file_path;
+ return Status::InternalError("host or port is empty");
+ }
+ peer_read_counter << 1;
+ PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
+ auto st = peer_reader.fetch_blocks(empty_blocks, empty_start,
Slice(buffer.get(), size), &size,
+ io_ctx);
+ if (!st.ok()) {
+ LOG_WARNING("PeerFileCacheReader read from peer failed")
+ .tag("host", host)
+ .tag("port", port)
+ .tag("error", st.msg());
+ }
+ return st;
+}
+
+// Execute S3 read
+Status execute_s3_read(size_t empty_start, size_t& size,
std::unique_ptr<char[]>& buffer,
+ ReadStatistics& stats, const IOContext* io_ctx,
+ FileReaderSPtr remote_file_reader) {
+ s3_read_counter << 1;
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+ return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size),
&size, io_ctx);
+}
+
+} // anonymous namespace
+
+Status CachedRemoteFileReader::_execute_remote_read(const
std::vector<FileBlockSPtr>& empty_blocks,
+ size_t empty_start,
size_t& size,
+ std::unique_ptr<char[]>&
buffer,
+ ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
+ // Determine read type from debug point or default to S3
+ std::string read_type = "s3";
+ read_type = dp->param<std::string>("type", "s3");
+ LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
+ .tag("path", path().native())
+ .tag("off", empty_start)
+ .tag("size", size)
+ .tag("type", read_type);
+ // Execute appropriate read strategy
+ if (read_type == "s3") {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ return execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
+ _is_doris_table, stats, io_ctx);
+ }
+ });
+
+ if (!_is_doris_table || !doris::config::enable_cache_read_from_peer) {
+ return execute_s3_read(empty_start, size, buffer, stats, io_ctx,
_remote_file_reader);
+ } else {
+ // first try peer read, if peer failed, fallback to S3
+ // peer timeout is 5 seconds
+ // TODO(dx): here peer and s3 reader need to get data in parallel, and
take the one that is correct and returns first
+ auto st = execute_peer_read(empty_blocks, empty_start, size, buffer,
path().native(),
Review Comment:
只要打开 doris::config::enable_cache_read_from_peer 就必须从 peer读吗?
没有其他的辅助条件判断 能不能从peer读?
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -154,6 +169,112 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}
+
+void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
+ [[maybe_unused]],
+ const PFetchPeerDataRequest*
request,
+ PFetchPeerDataResponse*
response,
+ google::protobuf::Closure*
done) {
+ brpc::ClosureGuard closure_guard(done);
Review Comment:
bug, 这个done 要放到 异步的线程池里去调用 done->run()
##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -131,6 +150,124 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+
+ auto [host, port] = get_peer_connection_info(file_path);
+ VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ",
port=" << port
+ << ", file_path=" << file_path;
+
+ if (host.empty() || port == 0) {
+ LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is
empty"
Review Comment:
这里也加个bvar
##########
be/src/io/cache/block_file_cache_downloader.cpp:
##########
@@ -169,6 +172,13 @@ std::unordered_map<std::string, RowsetMetaSharedPtr>
snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}
+static void CleanUpExpiredMappings(void* arg) {
Review Comment:
bad code style
lowe_under_score
##########
be/src/io/cache/cached_remote_file_reader.cpp:
##########
@@ -131,6 +150,124 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
return std::make_pair(align_left, align_size);
}
+namespace {
+std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
+ return StorageResource::parse_tablet_id_from_path(file_path);
+}
+
+// Get peer connection info from tablet_id
+std::pair<std::string, int> get_peer_connection_info(const std::string&
file_path) {
+ std::string host = "";
+ int port = 0;
+
+ // Try to get tablet_id from actual path and lookup tablet info
+ if (auto tablet_id = extract_tablet_id(file_path)) {
+ auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
+ if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
+ host = tablet_info->first;
+ port = tablet_info->second;
+ } else {
+ LOG_WARNING("get peer connection info not found")
+ .tag("tablet_id", *tablet_id)
+ .tag("file_path", file_path);
+ }
+ } else {
+ LOG_WARNING("parse tablet id from path failed")
+ .tag("tablet_id", "null")
+ .tag("file_path", file_path);
+ }
+
+ DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
+ host = dp->param<std::string>("host", "127.0.0.1");
+ port = dp->param("port", 9060);
+ LOG_WARNING("debug point
PeerFileCacheReader::_fetch_from_peer_cache_blocks")
+ .tag("host", host)
+ .tag("port", port);
+ });
+
+ return {host, port};
+}
+
+// Execute peer read with fallback to S3
+Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
size_t empty_start,
+ size_t& size, std::unique_ptr<char[]>& buffer,
+ const std::string& file_path, bool is_doris_table,
ReadStatistics& stats,
+ const IOContext* io_ctx) {
+ SCOPED_RAW_TIMER(&stats.remote_read_timer);
+
+ auto [host, port] = get_peer_connection_info(file_path);
Review Comment:
这里策略还需要描述清楚一些
##########
be/src/cloud/cloud_internal_service.cpp:
##########
@@ -154,6 +169,112 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
VLOG_DEBUG << "warm up get meta request=" << request->DebugString()
<< ", response=" << response->DebugString();
}
+
+void
CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController*
controller
Review Comment:
这个RPC实现要放到线程池里. 单独改.
##########
be/src/cloud/config.h:
##########
@@ -175,5 +175,9 @@ DECLARE_mBool(enable_standby_passive_compaction);
DECLARE_mDouble(standby_compaction_version_ratio);
+DECLARE_mBool(enable_cache_read_from_peer);
+
+DECLARE_mInt64(cache_read_from_peer_expired_seconds);
Review Comment:
need comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]