This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c84463fe99a [feature](cloud) Support path prefix for HDFS storage vault. (#32681) c84463fe99a is described below commit c84463fe99aa8faf2165816b67e906d06d31f5d1 Author: Shuo Wang <wangshuo...@gmail.com> AuthorDate: Wed Mar 27 14:22:53 2024 +0800 [feature](cloud) Support path prefix for HDFS storage vault. (#32681) --- be/src/cloud/cloud_meta_mgr.cpp | 16 ++------ be/src/cloud/cloud_meta_mgr.h | 5 ++- be/src/cloud/cloud_storage_engine.cpp | 17 +++++---- be/src/io/hdfs_util.cpp | 26 +++++++++++++ be/src/io/hdfs_util.h | 6 +++ be/src/olap/storage_policy.cpp | 2 +- be/src/runtime/tablets_channel.cpp | 1 + .../org/apache/doris/catalog/HdfsStorageVault.java | 43 +++++++++++----------- .../org/apache/doris/catalog/StorageVault.java | 9 +---- 9 files changed, 72 insertions(+), 53 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index bfd15a3f24b..c636d92ed82 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -801,8 +801,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn); } -Status CloudMetaMgr::get_storage_vault_info( - std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>>* vault_infos) { +Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) { GetObjStoreInfoRequest req; GetObjStoreInfoResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); @@ -826,18 +825,9 @@ Status CloudMetaMgr::get_storage_vault_info( }); } for (const auto& vault : resp.storage_vault()) { - THdfsParams params; - params.fs_name = vault.hdfs_info().build_conf().fs_name(); - params.user = vault.hdfs_info().build_conf().user(); - params.hdfs_kerberos_keytab = vault.hdfs_info().build_conf().hdfs_kerberos_keytab(); - params.hdfs_kerberos_principal = vault.hdfs_info().build_conf().hdfs_kerberos_principal(); - for (const auto& confs : vault.hdfs_info().build_conf().hdfs_confs()) { - THdfsConf conf; - conf.key = confs.key(); - conf.value = confs.value(); - params.hdfs_conf.emplace_back(std::move(conf)); + if (vault.has_hdfs_info()) { + vault_infos->emplace_back(vault.id(), vault.hdfs_info()); } - vault_infos->emplace_back(vault.id(), std::move(params)); } return Status::OK(); } diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index ede347af81c..909df671c3a 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -43,6 +43,8 @@ class TabletJobInfoPB; class TabletStatsPB; class TabletIndexPB; +using StorageVaultInfos = std::vector<std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>>>; + Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency); class CloudMetaMgr { @@ -70,8 +72,7 @@ public: Status precommit_txn(const StreamLoadContext& ctx); - Status get_storage_vault_info( - std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>>* vault_infos); + Status get_storage_vault_info(StorageVaultInfos* vault_infos); Status prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res); diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index efff5a26adb..fd44adbc959 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -18,6 +18,7 @@ #include "cloud/cloud_storage_engine.h" #include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/cloud.pb.h> #include <rapidjson/document.h> #include <rapidjson/encodings.h> #include <rapidjson/prettywriter.h> @@ -34,6 +35,7 @@ #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/s3_file_system.h" +#include "io/hdfs_util.h" #include "olap/cumulative_compaction_policy.h" #include "olap/memtable_flush_executor.h" #include "olap/storage_policy.h" @@ -82,9 +84,11 @@ struct VaultCreateFSVisitor { } // TODO(ByteYue): Make sure enable_java_support is on - Status operator()(const THdfsParams& hdfs_params) const { - auto fs = DORIS_TRY( - io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr)); + Status operator()(const cloud::HdfsVaultInfo& vault) const { + auto hdfs_params = io::to_hdfs_params(vault); + auto fs = + DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, id, nullptr, + vault.has_prefix() ? vault.prefix() : "")); put_storage_resource(id, {std::move(fs), 0}); LOG_INFO("successfully create hdfs vault, vault id {}", id); return Status::OK(); @@ -107,7 +111,7 @@ struct RefreshFSVaultVisitor { return st; } - Status operator()(const THdfsParams& hdfs_params) const { + Status operator()(const cloud::HdfsVaultInfo& vault_info) const { // TODO(ByteYue): Implmente the hdfs fs refresh logic return Status::OK(); } @@ -117,7 +121,7 @@ struct RefreshFSVaultVisitor { }; Status CloudStorageEngine::open() { - std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> vault_infos; + cloud::StorageVaultInfos vault_infos; do { auto st = _meta_mgr->get_storage_vault_info(&vault_infos); if (st.ok()) { @@ -133,7 +137,6 @@ Status CloudStorageEngine::open() { for (auto& [id, vault_info] : vault_infos) { RETURN_IF_ERROR(std::visit(VaultCreateFSVisitor {id}, vault_info)); } - set_latest_fs(get_filesystem(std::get<0>(vault_infos.back()))); // TODO(plat1ko): DeleteBitmapTxnManager @@ -239,7 +242,7 @@ Status CloudStorageEngine::start_bg_threads() { void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::seconds(config::refresh_s3_info_interval_s))) { - std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> vault_infos; + cloud::StorageVaultInfos vault_infos; auto st = _meta_mgr->get_storage_vault_info(&vault_infos); if (!st.ok()) { LOG(WARNING) << "failed to get storage vault info. err=" << st; diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index d6b491c8db9..21a9140ad4a 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -17,6 +17,8 @@ #include "io/hdfs_util.h" +#include <gen_cpp/cloud.pb.h> + #include <ostream> #include "common/logging.h" @@ -140,4 +142,28 @@ bool is_hdfs(const std::string& path_or_fs) { return path_or_fs.rfind("hdfs://") == 0; } +THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) { + THdfsParams params; + auto build_conf = vault.build_conf(); + params.__set_fs_name(build_conf.fs_name()); + if (build_conf.has_user()) { + params.__set_user(build_conf.user()); + } + if (build_conf.has_hdfs_kerberos_principal()) { + params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_principal()); + } + if (build_conf.has_hdfs_kerberos_keytab()) { + params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_keytab()); + } + std::vector<THdfsConf> tconfs; + for (const auto& confs : vault.build_conf().hdfs_confs()) { + THdfsConf conf; + conf.__set_key(confs.key()); + conf.__set_value(confs.value()); + tconfs.emplace_back(conf); + } + params.__set_hdfs_conf(tconfs); + return params; +} + } // namespace doris::io diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index ccb4c9458ad..f1b236887d5 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -27,6 +27,10 @@ #include "io/fs/hdfs.h" #include "io/fs/path.h" +namespace cloud { +class HdfsVaultInfo; +} + namespace doris { class HDFSCommonBuilder; class THdfsParams; @@ -125,5 +129,7 @@ std::string get_fs_name(const std::string& path); // return true if path_or_fs contains "hdfs://" bool is_hdfs(const std::string& path_or_fs); +THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault); + } // namespace io } // namespace doris diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp index 4192610e2f0..6c08f7c49f8 100644 --- a/be/src/olap/storage_policy.cpp +++ b/be/src/olap/storage_policy.cpp @@ -43,7 +43,7 @@ Status get_remote_file_system(int64_t storage_policy_id, auto resource = get_storage_resource(storage_policy->resource_id); *fs = resource.fs; if (*fs == nullptr) { - return Status::NotFound<false>("could not find resource, resouce_id={}", + return Status::NotFound<false>("could not find resource, resource_id={}", storage_policy->resource_id); } return Status::OK(); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 38eb11e814b..7007f097e69 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -466,6 +466,7 @@ Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& req .table_schema_param = _schema, .is_high_priority = _is_high_priority, .write_file_cache = request.write_file_cache(), + .storage_vault_id = request.storage_vault_id(), }; auto delta_writer = create_delta_writer(wrequest); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java index f1a50102eb6..6332acd04b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java @@ -21,12 +21,14 @@ import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.Set; /** * HDFS resource @@ -37,21 +39,31 @@ import java.util.Map; * ( * "type" = "hdfs", * "fs.defaultFS" = "hdfs://10.220.147.151:8020", - * "fs.prefix" = "", + * "path_prefix" = "/path/to/data", * "hadoop.username" = "root" * ); */ public class HdfsStorageVault extends StorageVault { private static final Logger LOG = LogManager.getLogger(HdfsStorageVault.class); + + public static final String VAULT_TYPE = "type"; public static final String HADOOP_FS_PREFIX = "dfs."; public static String HADOOP_FS_NAME = "fs.defaultFS"; - public static String HADOOP_PREFIX = "fs.prefix"; + public static String VAULT_PATH_PREFIX = "path_prefix"; public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static String DSF_NAMESERVICES = "dfs.nameservices"; public static final String HDFS_PREFIX = "hdfs:"; public static final String HDFS_FILE_PREFIX = "hdfs://"; + /** + * Property keys used by Doris, and should not be put in HDFS client configs, + * such as `type`, `path_prefix`, etc. + */ + private static final Set<String> nonHdfsConfPropertyKeys = ImmutableSet.of(VAULT_TYPE, VAULT_PATH_PREFIX) + .stream().map(String::toLowerCase) + .collect(ImmutableSet.toImmutableSet()); + @SerializedName(value = "properties") private Map<String, String> properties; @@ -67,26 +79,11 @@ public class HdfsStorageVault extends StorageVault { } } - @Override - protected void setProperties(Map<String, String> properties) throws DdlException { - // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read. - // We should disable short circuit read if they are not both set because it will cause performance down. - if (!(enableShortCircuitRead(properties))) { - properties.put(HADOOP_SHORT_CIRCUIT, "false"); - } - this.properties = properties; - } - @Override public Map<String, String> getCopiedProperties() { return Maps.newHashMap(properties); } - public static boolean enableShortCircuitRead(Map<String, String> properties) { - return "true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false")) - && properties.containsKey(HADOOP_SOCKET_PATH); - } - public static Cloud.HdfsVaultInfo generateHdfsParam(Map<String, String> properties) { Cloud.HdfsVaultInfo.Builder hdfsVaultInfoBuilder = Cloud.HdfsVaultInfo.newBuilder(); @@ -94,7 +91,7 @@ public class HdfsStorageVault extends StorageVault { for (Map.Entry<String, String> property : properties.entrySet()) { if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) { hdfsConfBuilder.setFsName(property.getValue()); - } else if (property.getKey().equalsIgnoreCase(HADOOP_PREFIX)) { + } else if (property.getKey().equalsIgnoreCase(VAULT_PATH_PREFIX)) { hdfsVaultInfoBuilder.setPrefix(property.getValue()); } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) { hdfsConfBuilder.setUser(property.getValue()); @@ -103,10 +100,12 @@ public class HdfsStorageVault extends StorageVault { } else if (property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)) { hdfsConfBuilder.setHdfsKerberosKeytab(property.getValue()); } else { - Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf = Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder(); - conf.setKey(property.getKey()); - conf.setValue(property.getValue()); - hdfsConfBuilder.addHdfsConfs(conf.build()); + if (!nonHdfsConfPropertyKeys.contains(property.getKey().toLowerCase())) { + Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf = Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder(); + conf.setKey(property.getKey()); + conf.setValue(property.getValue()); + hdfsConfBuilder.addHdfsConfs(conf.build()); + } } } return hdfsVaultInfoBuilder.setBuildConf(hdfsConfBuilder.build()).build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java index 98f35ac1695..f6e0525d17b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java @@ -84,9 +84,7 @@ public abstract class StorageVault { } public static StorageVault fromStmt(CreateStorageVaultStmt stmt) throws DdlException { - StorageVault storageVault = getStorageVaultInstance(stmt); - storageVault.setProperties(stmt.getProperties()); - return storageVault; + return getStorageVaultInstance(stmt); } public boolean ifNotExists() { @@ -153,10 +151,5 @@ public abstract class StorageVault { } } - /** - * Set and check the properties in child resources - */ - protected abstract void setProperties(Map<String, String> properties) throws DdlException; - public abstract Map<String, String> getCopiedProperties(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org