gavinchou commented on code in PR #31628: URL: https://github.com/apache/doris/pull/31628#discussion_r1520854955
########## be/src/cloud/cloud_storage_engine.cpp: ########## @@ -189,51 +193,93 @@ Status CloudStorageEngine::start_bg_threads() { return Status::OK(); } -void CloudStorageEngine::_refresh_s3_info_thread_callback() { +struct VaultCreateFSVisitor { + VaultCreateFSVisitor(const std::string& id) : id(id) {} + void operator()(S3Conf s3_conf) const { + LOG(INFO) << "get new s3 info: " << s3_conf.to_string() << " resource_id=" << id; + std::shared_ptr<io::S3FileSystem> s3_fs; + auto st = io::S3FileSystem::create(std::move(s3_conf), id, &s3_fs); + if (!st.ok()) { + LOG(WARNING) << "failed to create s3 fs. id=" << id; + return; + } + + st = s3_fs->connect(); + if (!st.ok()) { + LOG(WARNING) << "failed to connect s3 fs. id=" << id; + return; + } + + put_storage_resource(std::atol(id.data()), {s3_fs, 0}); + } + + void operator()(THdfsParams hdfs_params) const { + std::shared_ptr<io::HdfsFileSystem> hdfs_fs; + auto st = io::HdfsFileSystem::create(hdfs_params, id, "", nullptr, &hdfs_fs); + if (!st.ok()) { + LOG(WARNING) << "failed to create s3 fs. id=" << id; + return; + } + + st = hdfs_fs->connect(); + if (!st.ok()) { + LOG(WARNING) << "failed to connect s3 fs. id=" << id; + return; + } + + put_storage_resource(std::atol(id.data()), {hdfs_fs, 0}); + } + const std::string& id; +}; + +struct RefreshFSVaultVisitor { + RefreshFSVaultVisitor(std::string_view id, std::shared_ptr<doris::io::FileSystem> fs) + : id(id), fs(std::move(fs)) {} + void operator()(S3Conf s3_conf) const { + auto s3_fs = std::reinterpret_pointer_cast<io::S3FileSystem>(fs); + if (s3_fs->s3_conf().ak != s3_conf.ak || s3_fs->s3_conf().sk != s3_conf.sk || + s3_fs->s3_conf().sse_enabled != s3_conf.sse_enabled) { + auto cur_s3_conf = s3_fs->s3_conf(); + LOG(INFO) << "update s3 info, old: " << cur_s3_conf.to_string() + << " new: " << s3_conf.to_string() << " resource_id=" << id; + cur_s3_conf.ak = s3_conf.ak; + cur_s3_conf.sk = s3_conf.sk; + cur_s3_conf.sse_enabled = s3_conf.sse_enabled; + s3_fs->set_conf(std::move(cur_s3_conf)); + auto st = s3_fs->connect(); + if (!st.ok()) { + LOG(WARNING) << "failed to connect s3 fs. id=" << id; + } + } + } + + void operator()(THdfsParams hdfs_params) const { + // TODO(ByteYue): Implmente the hdfs fs refresh logic + } + std::string_view id; + std::shared_ptr<doris::io::FileSystem> fs; +}; + +void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::seconds(config::refresh_s3_info_interval_s))) { - std::vector<std::tuple<std::string, S3Conf>> s3_infos; - auto st = _meta_mgr->get_s3_info(&s3_infos); + std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> s3_infos; + auto st = _meta_mgr->get_storage_vault_info(&s3_infos); if (!st.ok()) { LOG(WARNING) << "failed to refresh object store info. err=" << st; continue; } CHECK(!s3_infos.empty()) << "no s3 infos"; - for (auto& [id, s3_conf] : s3_infos) { + for (auto& [id, vault_info] : s3_infos) { auto fs = get_filesystem(id); + auto s3_conf = std::get<S3Conf>(vault_info); Review Comment: unused var? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org