This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 2d3a3b1a69a1d9db8edefc68b9165e403a28ae2e Author: morningman <morning...@163.com> AuthorDate: Fri Jun 17 11:48:58 2022 +0800 [feature] Support hive on s3 (#10128) Support query hive table on S3. Pass AK/SK, Region and s3 endpoint to hive table while creating the external table. example create table sql: ``` CREATE TABLE `region_s3` ( `r_regionkey` integer NOT NULL, `r_name` char(25) NOT NULL, `r_comment` varchar(152) ) engine=hive properties ("database"="default", "table"="region_s3", “hive.metastore.uris"="thrift://127.0.0.1:9083", “AWS_ACCESS_KEY”=“YOUR_ACCESS_KEY", “AWS_SECRET_KEY”=“YOUR_SECRET_KEY", "AWS_ENDPOINT"="s3.us-east-1.amazonaws.com", “AWS_REGION”=“us-east-1”); ``` --- .../doris/catalog/HiveMetaStoreClientHelper.java | 43 +++++++++++++++++++--- .../java/org/apache/doris/catalog/HiveTable.java | 19 ++++++---- .../org/apache/doris/planner/HiveScanNode.java | 21 +++++++++-- 3 files changed, 68 insertions(+), 15 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index f192df298b..b49a7e5d7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -29,6 +29,7 @@ import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.BrokerUtil; @@ -170,10 +171,11 @@ public class HiveMetaStoreClientHelper { * @throws DdlException */ public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl) throws DdlException { + List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) throws DdlException { HiveMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)); List<RemoteIterator<LocatedFileStatus>> remoteIterators; + Boolean onS3 = type.equals(StorageBackend.StorageType.S3); if (remoteHiveTbl.getPartitionKeys().size() > 0) { // hive partitioned table, get file iterator from table partition sd info List<Partition> hivePartitions = new ArrayList<>(); @@ -186,10 +188,10 @@ public class HiveMetaStoreClientHelper { } finally { client.close(); } - remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties()); + remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties(), onS3); } else { // hive non-partitioned table, get file iterator from table sd info - remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties()); + remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties(), onS3); } String hdfsUrl = ""; @@ -204,6 +206,12 @@ public class HiveMetaStoreClientHelper { // path = "/path/to/partition/file_name" // eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse/dae.db/customer/state=CA/city=SanJose/000000_0 String path = fileStatus.getPath().toUri().getPath(); + if (onS3) { + // Backend need full s3 path (with s3://bucket at the beginning) to read the data on s3. + // path = "s3://bucket/path/to/partition/file_name" + // eg: s3://hive-s3-test/region/region.tbl + path = fileStatus.getPath().toString(); + } brokerFileStatus.setPath(path); fileStatuses.add(brokerFileStatus); if (StringUtils.isEmpty(hdfsUrl)) { @@ -222,7 +230,24 @@ public class HiveMetaStoreClientHelper { return hdfsUrl; } - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException { + private static void setS3Configuration(Configuration configuration, Map<String, String> properties) { + if (properties.containsKey(HiveTable.S3_AK)) { + configuration.set("fs.s3a.access.key", properties.get(HiveTable.S3_AK)); + } + if (properties.containsKey(HiveTable.S3_SK)) { + configuration.set("fs.s3a.secret.key", properties.get(HiveTable.S3_SK)); + } + if (properties.containsKey(HiveTable.S3_ENDPOINT)) { + configuration.set("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT)); + } + configuration.set("fs.s3.impl.disable.cache", "true"); + configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + configuration.set("fs.s3a.attempts.maximum", "2"); + } + + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( + List<Partition> partitions, Map<String, String> properties, boolean onS3) + throws DdlException { List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); Configuration configuration = new Configuration(false); for (Map.Entry<String, String> entry : properties.entrySet()) { @@ -230,6 +255,9 @@ public class HiveMetaStoreClientHelper { configuration.set(entry.getKey(), entry.getValue()); } } + if (onS3) { + setS3Configuration(configuration, properties); + } for (Partition p : partitions) { String location = p.getSd().getLocation(); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); @@ -244,7 +272,9 @@ public class HiveMetaStoreClientHelper { return iterators; } - private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException { + private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator( + Table table, Map<String, String> properties, boolean onS3) + throws DdlException { List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>(); Configuration configuration = new Configuration(false); boolean isSecurityEnabled = false; @@ -257,6 +287,9 @@ public class HiveMetaStoreClientHelper { isSecurityEnabled = true; } } + if (onS3) { + setS3Configuration(configuration, properties); + } String location = table.getSd().getLocation(); org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index 19be317f12..812185d15e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -42,15 +42,19 @@ public class HiveTable extends Table { private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table"; private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it"; - private static final String HIVE_DB = "database"; - private static final String HIVE_TABLE = "table"; - public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; - public static final String HIVE_HDFS_PREFIX = "dfs"; - private String hiveDb; private String hiveTable; private Map<String, String> hiveProperties = Maps.newHashMap(); + public static final String HIVE_DB = "database"; + public static final String HIVE_TABLE = "table"; + public static final String HIVE_METASTORE_URIS = "hive.metastore.uris"; + public static final String HIVE_HDFS_PREFIX = "dfs"; + public static final String S3_PROPERTIES_PREFIX = "AWS"; + public static final String S3_AK = "AWS_ACCESS_KEY"; + public static final String S3_SK = "AWS_SECRET_KEY"; + public static final String S3_ENDPOINT = "AWS_ENDPOINT"; + public HiveTable() { super(TableType.HIVE); } @@ -142,8 +146,9 @@ public class HiveTable extends Table { Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator(); while(iter.hasNext()) { Map.Entry<String, String> entry = iter.next(); - if (entry.getKey().startsWith(HIVE_HDFS_PREFIX)) { - hiveProperties.put(entry.getKey(), entry.getValue()); + String key = entry.getKey(); + if (key.startsWith(HIVE_HDFS_PREFIX) || key.startsWith(S3_PROPERTIES_PREFIX)) { + hiveProperties.put(key, entry.getValue()); iter.remove(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java index 711e63e695..cfd76de134 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.Expr; @@ -68,6 +69,7 @@ public class HiveScanNode extends BrokerScanNode { private String fileFormat; private String path; private List<String> partitionKeys = new ArrayList<>(); + private StorageBackend.StorageType storageType; /* hive table properties */ public String getHostUri() { @@ -123,13 +125,26 @@ public class HiveScanNode extends BrokerScanNode { getFileFormat(), getPartitionKeys(), getParsedColumnExprList())); - brokerDesc = new BrokerDesc("HiveTableDesc", StorageBackend.StorageType.HDFS, hiveTable.getHiveProperties()); + brokerDesc = new BrokerDesc("HiveTableDesc", storageType, hiveTable.getHiveProperties()); targetTable = hiveTable; } - private void initHiveTblProperties() throws DdlException { + private void setStorageType(String location) throws UserException { + String[] strings = StringUtils.split(location, "/"); + String storagePrefix = strings[0].split(":")[0]; + if (storagePrefix.equalsIgnoreCase("s3")) { + this.storageType = StorageBackend.StorageType.S3; + } else if (storagePrefix.equalsIgnoreCase("hdfs")) { + this.storageType = StorageBackend.StorageType.HDFS; + } else { + throw new UserException("Not supported storage type: " + storagePrefix); + } + } + + private void initHiveTblProperties() throws UserException { this.remoteHiveTable = HiveMetaStoreClientHelper.getTable(hiveTable); this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(remoteHiveTable.getSd().getInputFormat()); + this.setStorageType(remoteHiveTable.getSd().getLocation()); Map<String, String> serDeInfoParams = remoteHiveTable.getSd().getSerdeInfo().getParameters(); this.columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim")) ? @@ -179,7 +194,7 @@ public class HiveScanNode extends BrokerScanNode { } List<TBrokerFileStatus> fileStatuses = new ArrayList<>(); this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate, - fileStatuses, remoteHiveTable); + fileStatuses, remoteHiveTable, storageType); fileStatusesList.add(fileStatuses); filesAdded += fileStatuses.size(); for (TBrokerFileStatus fstatus : fileStatuses) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org