This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 2d3a3b1a69a1d9db8edefc68b9165e403a28ae2e
Author: morningman <morning...@163.com>
AuthorDate: Fri Jun 17 11:48:58 2022 +0800

    [feature] Support hive on s3 (#10128)
    
    Support query hive table on S3. Pass AK/SK, Region and s3 endpoint to hive 
table while creating the external table.
    
    example create table sql:
    ```
    CREATE TABLE `region_s3` (
    `r_regionkey` integer NOT NULL,
    `r_name` char(25) NOT NULL,
    `r_comment` varchar(152) )
    engine=hive
    properties
    ("database"="default",
    "table"="region_s3",
    “hive.metastore.uris"="thrift://127.0.0.1:9083",
    “AWS_ACCESS_KEY”=“YOUR_ACCESS_KEY",
    “AWS_SECRET_KEY”=“YOUR_SECRET_KEY",
    "AWS_ENDPOINT"="s3.us-east-1.amazonaws.com",
    “AWS_REGION”=“us-east-1”);
    ```
---
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 43 +++++++++++++++++++---
 .../java/org/apache/doris/catalog/HiveTable.java   | 19 ++++++----
 .../org/apache/doris/planner/HiveScanNode.java     | 21 +++++++++--
 3 files changed, 68 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index f192df298b..b49a7e5d7d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.BrokerUtil;
@@ -170,10 +171,11 @@ public class HiveMetaStoreClientHelper {
      * @throws DdlException
      */
     public static String getHiveDataFiles(HiveTable hiveTable, 
ExprNodeGenericFuncDesc hivePartitionPredicate,
-                                          List<TBrokerFileStatus> 
fileStatuses, Table remoteHiveTbl) throws DdlException {
+                                          List<TBrokerFileStatus> 
fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) throws 
DdlException {
         HiveMetaStoreClient client = 
getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
 
         List<RemoteIterator<LocatedFileStatus>> remoteIterators;
+        Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
         if (remoteHiveTbl.getPartitionKeys().size() > 0) {
             // hive partitioned table, get file iterator from table partition 
sd info
             List<Partition> hivePartitions = new ArrayList<>();
@@ -186,10 +188,10 @@ public class HiveMetaStoreClientHelper {
             } finally {
                 client.close();
             }
-            remoteIterators = getRemoteIterator(hivePartitions, 
hiveTable.getHiveProperties());
+            remoteIterators = getRemoteIterator(hivePartitions, 
hiveTable.getHiveProperties(), onS3);
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl, 
hiveTable.getHiveProperties());
+            remoteIterators = getRemoteIterator(remoteHiveTbl, 
hiveTable.getHiveProperties(), onS3);
         }
 
         String hdfsUrl = "";
@@ -204,6 +206,12 @@ public class HiveMetaStoreClientHelper {
                     // path = "/path/to/partition/file_name"
                     // eg: 
/home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse/dae.db/customer/state=CA/city=SanJose/000000_0
                     String path = fileStatus.getPath().toUri().getPath();
+                    if (onS3) {
+                        // Backend need full s3 path (with s3://bucket at the 
beginning) to read the data on s3.
+                        // path = "s3://bucket/path/to/partition/file_name"
+                        // eg: s3://hive-s3-test/region/region.tbl
+                        path = fileStatus.getPath().toString();
+                    }
                     brokerFileStatus.setPath(path);
                     fileStatuses.add(brokerFileStatus);
                     if (StringUtils.isEmpty(hdfsUrl)) {
@@ -222,7 +230,24 @@ public class HiveMetaStoreClientHelper {
         return hdfsUrl;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(List<Partition> partitions, Map<String, String> properties) 
throws DdlException {
+    private static void setS3Configuration(Configuration configuration, 
Map<String, String> properties) {
+        if (properties.containsKey(HiveTable.S3_AK)) {
+            configuration.set("fs.s3a.access.key", 
properties.get(HiveTable.S3_AK));
+        }
+        if (properties.containsKey(HiveTable.S3_SK)) {
+            configuration.set("fs.s3a.secret.key", 
properties.get(HiveTable.S3_SK));
+        }
+        if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
+            configuration.set("fs.s3a.endpoint", 
properties.get(HiveTable.S3_ENDPOINT));
+        }
+        configuration.set("fs.s3.impl.disable.cache", "true");
+        configuration.set("fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+        configuration.set("fs.s3a.attempts.maximum", "2");
+    }
+
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
+            List<Partition> partitions, Map<String, String> properties, 
boolean onS3)
+            throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
         for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -230,6 +255,9 @@ public class HiveMetaStoreClientHelper {
                 configuration.set(entry.getKey(), entry.getValue());
             }
         }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
         for (Partition p : partitions) {
             String location = p.getSd().getLocation();
             org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
@@ -244,7 +272,9 @@ public class HiveMetaStoreClientHelper {
         return iterators;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> 
getRemoteIterator(Table table, Map<String, String> properties) throws 
DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
+            Table table, Map<String, String> properties, boolean onS3)
+            throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
         boolean isSecurityEnabled = false;
@@ -257,6 +287,9 @@ public class HiveMetaStoreClientHelper {
                 isSecurityEnabled = true;
             }
         }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new 
org.apache.hadoop.fs.Path(location);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index 19be317f12..812185d15e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -42,15 +42,19 @@ public class HiveTable extends Table {
     private static final String PROPERTY_MISSING_MSG = "Hive %s is null. 
Please add properties('%s'='xxx') when create table";
     private static final String PROPERTY_ERROR_MSG = "Hive table 
properties('%s'='%s') is illegal or not supported. Please check it";
 
-    private static final String HIVE_DB = "database";
-    private static final String HIVE_TABLE = "table";
-    public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
-    public static final String HIVE_HDFS_PREFIX = "dfs";
-
     private String hiveDb;
     private String hiveTable;
     private Map<String, String> hiveProperties = Maps.newHashMap();
 
+    public static final String HIVE_DB = "database";
+    public static final String HIVE_TABLE = "table";
+    public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+    public static final String HIVE_HDFS_PREFIX = "dfs";
+    public static final String S3_PROPERTIES_PREFIX = "AWS";
+    public static final String S3_AK = "AWS_ACCESS_KEY";
+    public static final String S3_SK = "AWS_SECRET_KEY";
+    public static final String S3_ENDPOINT = "AWS_ENDPOINT";
+
     public HiveTable() {
         super(TableType.HIVE);
     }
@@ -142,8 +146,9 @@ public class HiveTable extends Table {
             Iterator<Map.Entry<String, String>> iter = 
copiedProps.entrySet().iterator();
             while(iter.hasNext()) {
                 Map.Entry<String, String> entry = iter.next();
-                if (entry.getKey().startsWith(HIVE_HDFS_PREFIX)) {
-                    hiveProperties.put(entry.getKey(), entry.getValue());
+                String key = entry.getKey();
+                if (key.startsWith(HIVE_HDFS_PREFIX) || 
key.startsWith(S3_PROPERTIES_PREFIX)) {
+                    hiveProperties.put(key, entry.getValue());
                     iter.remove();
                 }
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index 711e63e695..cfd76de134 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.Expr;
@@ -68,6 +69,7 @@ public class HiveScanNode extends BrokerScanNode {
     private String fileFormat;
     private String path;
     private List<String> partitionKeys = new ArrayList<>();
+    private StorageBackend.StorageType storageType;
     /* hive table properties */
 
     public String getHostUri() {
@@ -123,13 +125,26 @@ public class HiveScanNode extends BrokerScanNode {
                         getFileFormat(),
                         getPartitionKeys(),
                         getParsedColumnExprList()));
-        brokerDesc = new BrokerDesc("HiveTableDesc", 
StorageBackend.StorageType.HDFS, hiveTable.getHiveProperties());
+        brokerDesc = new BrokerDesc("HiveTableDesc", storageType, 
hiveTable.getHiveProperties());
         targetTable = hiveTable;
     }
 
-    private void initHiveTblProperties() throws DdlException {
+    private void setStorageType(String location) throws UserException {
+        String[] strings = StringUtils.split(location, "/");
+        String storagePrefix = strings[0].split(":")[0];
+        if (storagePrefix.equalsIgnoreCase("s3")) {
+            this.storageType = StorageBackend.StorageType.S3;
+        } else if (storagePrefix.equalsIgnoreCase("hdfs")) {
+            this.storageType = StorageBackend.StorageType.HDFS;
+        } else {
+            throw new UserException("Not supported storage type: " + 
storagePrefix);
+        }
+    }
+
+    private void initHiveTblProperties() throws UserException {
         this.remoteHiveTable = HiveMetaStoreClientHelper.getTable(hiveTable);
         this.fileFormat = 
HiveMetaStoreClientHelper.HiveFileFormat.getFormat(remoteHiveTable.getSd().getInputFormat());
+        this.setStorageType(remoteHiveTable.getSd().getLocation());
 
         Map<String, String> serDeInfoParams = 
remoteHiveTable.getSd().getSerdeInfo().getParameters();
         this.columnSeparator = 
Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) ?
@@ -179,7 +194,7 @@ public class HiveScanNode extends BrokerScanNode {
         }
         List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
         this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, 
hivePartitionPredicate,
-            fileStatuses, remoteHiveTable);
+                fileStatuses, remoteHiveTable, storageType);
         fileStatusesList.add(fileStatuses);
         filesAdded += fileStatuses.size();
         for (TBrokerFileStatus fstatus : fileStatuses) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to