This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c84463fe99a [feature](cloud) Support path prefix for HDFS storage 
vault. (#32681)
c84463fe99a is described below

commit c84463fe99aa8faf2165816b67e906d06d31f5d1
Author: Shuo Wang <wangshuo...@gmail.com>
AuthorDate: Wed Mar 27 14:22:53 2024 +0800

    [feature](cloud) Support path prefix for HDFS storage vault. (#32681)
---
 be/src/cloud/cloud_meta_mgr.cpp                    | 16 ++------
 be/src/cloud/cloud_meta_mgr.h                      |  5 ++-
 be/src/cloud/cloud_storage_engine.cpp              | 17 +++++----
 be/src/io/hdfs_util.cpp                            | 26 +++++++++++++
 be/src/io/hdfs_util.h                              |  6 +++
 be/src/olap/storage_policy.cpp                     |  2 +-
 be/src/runtime/tablets_channel.cpp                 |  1 +
 .../org/apache/doris/catalog/HdfsStorageVault.java | 43 +++++++++++-----------
 .../org/apache/doris/catalog/StorageVault.java     |  9 +----
 9 files changed, 72 insertions(+), 53 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index bfd15a3f24b..c636d92ed82 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -801,8 +801,7 @@ Status CloudMetaMgr::precommit_txn(const StreamLoadContext& 
ctx) {
     return retry_rpc("precommit txn", req, &res, 
&MetaService_Stub::precommit_txn);
 }
 
-Status CloudMetaMgr::get_storage_vault_info(
-        std::vector<std::tuple<std::string, std::variant<S3Conf, 
THdfsParams>>>* vault_infos) {
+Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
     GetObjStoreInfoRequest req;
     GetObjStoreInfoResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
@@ -826,18 +825,9 @@ Status CloudMetaMgr::get_storage_vault_info(
                                   });
     }
     for (const auto& vault : resp.storage_vault()) {
-        THdfsParams params;
-        params.fs_name = vault.hdfs_info().build_conf().fs_name();
-        params.user = vault.hdfs_info().build_conf().user();
-        params.hdfs_kerberos_keytab = 
vault.hdfs_info().build_conf().hdfs_kerberos_keytab();
-        params.hdfs_kerberos_principal = 
vault.hdfs_info().build_conf().hdfs_kerberos_principal();
-        for (const auto& confs : vault.hdfs_info().build_conf().hdfs_confs()) {
-            THdfsConf conf;
-            conf.key = confs.key();
-            conf.value = confs.value();
-            params.hdfs_conf.emplace_back(std::move(conf));
+        if (vault.has_hdfs_info()) {
+            vault_infos->emplace_back(vault.id(), vault.hdfs_info());
         }
-        vault_infos->emplace_back(vault.id(), std::move(params));
     }
     return Status::OK();
 }
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index ede347af81c..909df671c3a 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -43,6 +43,8 @@ class TabletJobInfoPB;
 class TabletStatsPB;
 class TabletIndexPB;
 
+using StorageVaultInfos = std::vector<std::tuple<std::string, 
std::variant<S3Conf, HdfsVaultInfo>>>;
+
 Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, 
int concurrency);
 
 class CloudMetaMgr {
@@ -70,8 +72,7 @@ public:
 
     Status precommit_txn(const StreamLoadContext& ctx);
 
-    Status get_storage_vault_info(
-            std::vector<std::tuple<std::string, std::variant<S3Conf, 
THdfsParams>>>* vault_infos);
+    Status get_storage_vault_info(StorageVaultInfos* vault_infos);
 
     Status prepare_tablet_job(const TabletJobInfoPB& job, 
StartTabletJobResponse* res);
 
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index efff5a26adb..fd44adbc959 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -18,6 +18,7 @@
 #include "cloud/cloud_storage_engine.h"
 
 #include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/cloud.pb.h>
 #include <rapidjson/document.h>
 #include <rapidjson/encodings.h>
 #include <rapidjson/prettywriter.h>
@@ -34,6 +35,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/s3_file_system.h"
+#include "io/hdfs_util.h"
 #include "olap/cumulative_compaction_policy.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/storage_policy.h"
@@ -82,9 +84,11 @@ struct VaultCreateFSVisitor {
     }
 
     // TODO(ByteYue): Make sure enable_java_support is on
-    Status operator()(const THdfsParams& hdfs_params) const {
-        auto fs = DORIS_TRY(
-                io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, 
id, nullptr));
+    Status operator()(const cloud::HdfsVaultInfo& vault) const {
+        auto hdfs_params = io::to_hdfs_params(vault);
+        auto fs =
+                DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, 
hdfs_params.fs_name, id, nullptr,
+                                                     vault.has_prefix() ? 
vault.prefix() : ""));
         put_storage_resource(id, {std::move(fs), 0});
         LOG_INFO("successfully create hdfs vault, vault id {}", id);
         return Status::OK();
@@ -107,7 +111,7 @@ struct RefreshFSVaultVisitor {
         return st;
     }
 
-    Status operator()(const THdfsParams& hdfs_params) const {
+    Status operator()(const cloud::HdfsVaultInfo& vault_info) const {
         // TODO(ByteYue): Implmente the hdfs fs refresh logic
         return Status::OK();
     }
@@ -117,7 +121,7 @@ struct RefreshFSVaultVisitor {
 };
 
 Status CloudStorageEngine::open() {
-    std::vector<std::tuple<std::string, std::variant<S3Conf, THdfsParams>>> 
vault_infos;
+    cloud::StorageVaultInfos vault_infos;
     do {
         auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
         if (st.ok()) {
@@ -133,7 +137,6 @@ Status CloudStorageEngine::open() {
     for (auto& [id, vault_info] : vault_infos) {
         RETURN_IF_ERROR(std::visit(VaultCreateFSVisitor {id}, vault_info));
     }
-
     set_latest_fs(get_filesystem(std::get<0>(vault_infos.back())));
 
     // TODO(plat1ko): DeleteBitmapTxnManager
@@ -239,7 +242,7 @@ Status CloudStorageEngine::start_bg_threads() {
 void CloudStorageEngine::_refresh_storage_vault_info_thread_callback() {
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::seconds(config::refresh_s3_info_interval_s))) {
-        std::vector<std::tuple<std::string, std::variant<S3Conf, 
THdfsParams>>> vault_infos;
+        cloud::StorageVaultInfos vault_infos;
         auto st = _meta_mgr->get_storage_vault_info(&vault_infos);
         if (!st.ok()) {
             LOG(WARNING) << "failed to get storage vault info. err=" << st;
diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp
index d6b491c8db9..21a9140ad4a 100644
--- a/be/src/io/hdfs_util.cpp
+++ b/be/src/io/hdfs_util.cpp
@@ -17,6 +17,8 @@
 
 #include "io/hdfs_util.h"
 
+#include <gen_cpp/cloud.pb.h>
+
 #include <ostream>
 
 #include "common/logging.h"
@@ -140,4 +142,28 @@ bool is_hdfs(const std::string& path_or_fs) {
     return path_or_fs.rfind("hdfs://") == 0;
 }
 
+THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault) {
+    THdfsParams params;
+    auto build_conf = vault.build_conf();
+    params.__set_fs_name(build_conf.fs_name());
+    if (build_conf.has_user()) {
+        params.__set_user(build_conf.user());
+    }
+    if (build_conf.has_hdfs_kerberos_principal()) {
+        
params.__set_hdfs_kerberos_keytab(build_conf.hdfs_kerberos_principal());
+    }
+    if (build_conf.has_hdfs_kerberos_keytab()) {
+        
params.__set_hdfs_kerberos_principal(build_conf.hdfs_kerberos_keytab());
+    }
+    std::vector<THdfsConf> tconfs;
+    for (const auto& confs : vault.build_conf().hdfs_confs()) {
+        THdfsConf conf;
+        conf.__set_key(confs.key());
+        conf.__set_value(confs.value());
+        tconfs.emplace_back(conf);
+    }
+    params.__set_hdfs_conf(tconfs);
+    return params;
+}
+
 } // namespace doris::io
diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h
index ccb4c9458ad..f1b236887d5 100644
--- a/be/src/io/hdfs_util.h
+++ b/be/src/io/hdfs_util.h
@@ -27,6 +27,10 @@
 #include "io/fs/hdfs.h"
 #include "io/fs/path.h"
 
+namespace cloud {
+class HdfsVaultInfo;
+}
+
 namespace doris {
 class HDFSCommonBuilder;
 class THdfsParams;
@@ -125,5 +129,7 @@ std::string get_fs_name(const std::string& path);
 // return true if path_or_fs contains "hdfs://"
 bool is_hdfs(const std::string& path_or_fs);
 
+THdfsParams to_hdfs_params(const cloud::HdfsVaultInfo& vault);
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index 4192610e2f0..6c08f7c49f8 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -43,7 +43,7 @@ Status get_remote_file_system(int64_t storage_policy_id,
     auto resource = get_storage_resource(storage_policy->resource_id);
     *fs = resource.fs;
     if (*fs == nullptr) {
-        return Status::NotFound<false>("could not find resource, 
resouce_id={}",
+        return Status::NotFound<false>("could not find resource, 
resource_id={}",
                                        storage_policy->resource_id);
     }
     return Status::OK();
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 38eb11e814b..7007f097e69 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -466,6 +466,7 @@ Status BaseTabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& req
                 .table_schema_param = _schema,
                 .is_high_priority = _is_high_priority,
                 .write_file_cache = request.write_file_cache(),
+                .storage_vault_id = request.storage_vault_id(),
         };
 
         auto delta_writer = create_delta_writer(wrequest);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
index f1a50102eb6..6332acd04b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
@@ -21,12 +21,14 @@ import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.Map;
+import java.util.Set;
 
 /**
  * HDFS resource
@@ -37,21 +39,31 @@ import java.util.Map;
  * (
  * "type" = "hdfs",
  * "fs.defaultFS" = "hdfs://10.220.147.151:8020",
- * "fs.prefix" = "",
+ * "path_prefix" = "/path/to/data",
  * "hadoop.username" = "root"
  * );
  */
 public class HdfsStorageVault extends StorageVault {
     private static final Logger LOG = 
LogManager.getLogger(HdfsStorageVault.class);
+
+    public static final String VAULT_TYPE = "type";
     public static final String HADOOP_FS_PREFIX = "dfs.";
     public static String HADOOP_FS_NAME = "fs.defaultFS";
-    public static String HADOOP_PREFIX = "fs.prefix";
+    public static String VAULT_PATH_PREFIX = "path_prefix";
     public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
     public static String DSF_NAMESERVICES = "dfs.nameservices";
     public static final String HDFS_PREFIX = "hdfs:";
     public static final String HDFS_FILE_PREFIX = "hdfs://";
 
+    /**
+     * Property keys used by Doris, and should not be put in HDFS client 
configs,
+     * such as `type`, `path_prefix`, etc.
+     */
+    private static final Set<String> nonHdfsConfPropertyKeys = 
ImmutableSet.of(VAULT_TYPE, VAULT_PATH_PREFIX)
+            .stream().map(String::toLowerCase)
+            .collect(ImmutableSet.toImmutableSet());
+
     @SerializedName(value = "properties")
     private Map<String, String> properties;
 
@@ -67,26 +79,11 @@ public class HdfsStorageVault extends StorageVault {
         }
     }
 
-    @Override
-    protected void setProperties(Map<String, String> properties) throws 
DdlException {
-        // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should 
be both set to enable short circuit read.
-        // We should disable short circuit read if they are not both set 
because it will cause performance down.
-        if (!(enableShortCircuitRead(properties))) {
-            properties.put(HADOOP_SHORT_CIRCUIT, "false");
-        }
-        this.properties = properties;
-    }
-
     @Override
     public Map<String, String> getCopiedProperties() {
         return Maps.newHashMap(properties);
     }
 
-    public static boolean enableShortCircuitRead(Map<String, String> 
properties) {
-        return 
"true".equalsIgnoreCase(properties.getOrDefault(HADOOP_SHORT_CIRCUIT, "false"))
-                    && properties.containsKey(HADOOP_SOCKET_PATH);
-    }
-
     public static Cloud.HdfsVaultInfo generateHdfsParam(Map<String, String> 
properties) {
         Cloud.HdfsVaultInfo.Builder hdfsVaultInfoBuilder =
                     Cloud.HdfsVaultInfo.newBuilder();
@@ -94,7 +91,7 @@ public class HdfsStorageVault extends StorageVault {
         for (Map.Entry<String, String> property : properties.entrySet()) {
             if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
                 hdfsConfBuilder.setFsName(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HADOOP_PREFIX)) {
+            } else if (property.getKey().equalsIgnoreCase(VAULT_PATH_PREFIX)) {
                 hdfsVaultInfoBuilder.setPrefix(property.getValue());
             } else if 
(property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_USER_NAME)) {
                 hdfsConfBuilder.setUser(property.getValue());
@@ -103,10 +100,12 @@ public class HdfsStorageVault extends StorageVault {
             } else if 
(property.getKey().equalsIgnoreCase(AuthenticationConfig.HADOOP_KERBEROS_KEYTAB))
 {
                 hdfsConfBuilder.setHdfsKerberosKeytab(property.getValue());
             } else {
-                Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf = 
Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder();
-                conf.setKey(property.getKey());
-                conf.setValue(property.getValue());
-                hdfsConfBuilder.addHdfsConfs(conf.build());
+                if 
(!nonHdfsConfPropertyKeys.contains(property.getKey().toLowerCase())) {
+                    Cloud.HdfsBuildConf.HdfsConfKVPair.Builder conf = 
Cloud.HdfsBuildConf.HdfsConfKVPair.newBuilder();
+                    conf.setKey(property.getKey());
+                    conf.setValue(property.getValue());
+                    hdfsConfBuilder.addHdfsConfs(conf.build());
+                }
             }
         }
         return 
hdfsVaultInfoBuilder.setBuildConf(hdfsConfBuilder.build()).build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
index 98f35ac1695..f6e0525d17b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/StorageVault.java
@@ -84,9 +84,7 @@ public abstract class StorageVault {
     }
 
     public static StorageVault fromStmt(CreateStorageVaultStmt stmt) throws 
DdlException {
-        StorageVault storageVault = getStorageVaultInstance(stmt);
-        storageVault.setProperties(stmt.getProperties());
-        return storageVault;
+        return getStorageVaultInstance(stmt);
     }
 
     public boolean ifNotExists() {
@@ -153,10 +151,5 @@ public abstract class StorageVault {
         }
     }
 
-    /**
-     * Set and check the properties in child resources
-     */
-    protected abstract void setProperties(Map<String, String> properties) 
throws DdlException;
-
     public abstract Map<String, String> getCopiedProperties();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to