This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 97fa840324 [feature](multi-catalog)support iceberg hadoop catalog external table query (#22949) 97fa840324 is described below commit 97fa840324088a10b31b2e14f7d19f03fd0e067c Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Sun Aug 20 19:29:25 2023 +0800 [feature](multi-catalog)support iceberg hadoop catalog external table query (#22949) support iceberg hadoop catalog external table query --- docs/en/docs/lakehouse/multi-catalog/iceberg.md | 59 ++++++++++++++++++-- docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 61 ++++++++++++++++++--- .../org/apache/doris/catalog/HdfsResource.java | 1 + .../datasource/iceberg/IcebergExternalCatalog.java | 1 + .../iceberg/IcebergExternalCatalogFactory.java | 2 + .../iceberg/IcebergHadoopExternalCatalog.java | 62 ++++++++++++++++++++++ .../org/apache/doris/persist/gson/GsonUtils.java | 2 + .../planner/external/iceberg/IcebergScanNode.java | 23 ++++++-- ...est_external_catalog_iceberg_hadoop_catalog.out | 17 ++++++ ..._external_catalog_iceberg_hadoop_catalog.groovy | 43 +++++++++++++++ 10 files changed, 256 insertions(+), 15 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index 54af57bee8..2baa05770f 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES ( ### Create Catalog based on Iceberg API -Use the Iceberg API to access metadata, and support services such as Hive, REST, DLF and Glue as Iceberg's Catalog. +Use the Iceberg API to access metadata, and support services such as Hadoop File System, Hive, REST, DLF and Glue as Iceberg's Catalog. + +#### Hadoop Catalog + +```sql +CREATE CATALOG iceberg_hadoop PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type' = 'hadoop', + 'warehouse' = 'hdfs://your-host:8020/dir/key' +); +``` + +```sql +CREATE CATALOG iceberg_hadoop_ha PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type' = 'hadoop', + 'warehouse' = 'hdfs://your-nameservice/dir/key', + 'dfs.nameservices'='your-nameservice', + 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', + 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007', + 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007', + 'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider' +); +``` #### Hive Metastore @@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES ( `hive.metastore.uris`: Dataproc Metastore URI,See in Metastore Services :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore). -### Iceberg On S3 +### Iceberg On Object Storage If the data is stored on S3, the following parameters can be used in properties: ``` "s3.access_key" = "ak" "s3.secret_key" = "sk" -"s3.endpoint" = "http://endpoint-uri" -"s3.region" = "your-region" -"s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。 +"s3.endpoint" = "s3.us-east-1.amazonaws.com" +"s3.region" = "us-east-1" +``` + +The data is stored on Alibaba Cloud OSS: + +``` +"oss.access_key" = "ak" +"oss.secret_key" = "sk" +"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" +"oss.region" = "oss-cn-beijing" +``` + +The data is stored on Tencent Cloud COS: + +``` +"cos.access_key" = "ak" +"cos.secret_key" = "sk" +"cos.endpoint" = "cos.ap-beijing.myqcloud.com" +"cos.region" = "ap-beijing" +``` + +The data is stored on Huawei Cloud OBS: + +``` +"obs.access_key" = "ak" +"obs.secret_key" = "sk" +"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com" +"obs.region" = "cn-north-4" ``` ## Column type mapping diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index ab5b447005..3e6a4826d0 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -53,7 +53,30 @@ CREATE CATALOG iceberg PROPERTIES ( ### 基于Iceberg API创建Catalog -使用Iceberg API访问元数据的方式,支持Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。 +使用Iceberg API访问元数据的方式,支持Hadoop File System、Hive、REST、Glue、DLF等服务作为Iceberg的Catalog。 + +#### Hadoop Catalog + +```sql +CREATE CATALOG iceberg_hadoop PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type' = 'hadoop', + 'warehouse' = 'hdfs://your-host:8020/dir/key' +); +``` + +```sql +CREATE CATALOG iceberg_hadoop_ha PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type' = 'hadoop', + 'warehouse' = 'hdfs://your-nameservice/dir/key', + 'dfs.nameservices'='your-nameservice', + 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', + 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007', + 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007', + 'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider' +); +``` #### Hive Metastore @@ -133,16 +156,42 @@ CREATE CATALOG iceberg PROPERTIES ( `hive.metastore.uris`: Dataproc Metastore 服务开放的接口,在 Metastore 管理页面获取 :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore). -### Iceberg On S3 +### Iceberg On Object Storage -若数据存放在S3上,properties中可以使用以下参数 +若数据存放在S3上,properties中可以使用以下参数: ``` "s3.access_key" = "ak" "s3.secret_key" = "sk" -"s3.endpoint" = "http://endpoint-uri" -"s3.region" = "your-region" -"s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。 +"s3.endpoint" = "s3.us-east-1.amazonaws.com" +"s3.region" = "us-east-1" +``` + +数据存放在阿里云OSS上: + +``` +"oss.access_key" = "ak" +"oss.secret_key" = "sk" +"oss.endpoint" = "oss-cn-beijing-internal.aliyuncs.com" +"oss.region" = "oss-cn-beijing" +``` + +数据存放在腾讯云COS上: + +``` +"cos.access_key" = "ak" +"cos.secret_key" = "sk" +"cos.endpoint" = "cos.ap-beijing.myqcloud.com" +"cos.region" = "ap-beijing" +``` + +数据存放在华为云OBS上: + +``` +"obs.access_key" = "ak" +"obs.secret_key" = "sk" +"obs.endpoint" = "obs.cn-north-4.myhuaweicloud.com" +"obs.region" = "cn-north-4" ``` ## 列类型映射 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java index 1f87f93e9d..d2a03aaf90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java @@ -54,6 +54,7 @@ public class HdfsResource extends Resource { public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static String DSF_NAMESERVICES = "dfs.nameservices"; public static final String HDFS_PREFIX = "hdfs:"; + public static final String HDFS_FILE_PREFIX = "hdfs://"; @SerializedName(value = "properties") private Map<String, String> properties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 50816b77a0..8df4acfc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -44,6 +44,7 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; + public static final String ICEBERG_HADOOP = "hadoop"; public static final String ICEBERG_GLUE = "glue"; public static final String ICEBERG_DLF = "dlf"; protected String icebergCatalogType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index 6ea5a3a73b..e8f593f293 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -39,6 +39,8 @@ public class IcebergExternalCatalogFactory { return new IcebergGlueExternalCatalog(catalogId, name, resource, props, comment); case IcebergExternalCatalog.ICEBERG_DLF: return new IcebergDLFExternalCatalog(catalogId, name, resource, props, comment); + case IcebergExternalCatalog.ICEBERG_HADOOP: + return new IcebergHadoopExternalCatalog(catalogId, name, resource, props, comment); default: throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " value: " + catalogType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java new file mode 100644 index 0000000000..06d1a4caaa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.property.PropertyConverter; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.hadoop.HadoopCatalog; + +import java.util.HashMap; +import java.util.Map; + +public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog { + + public IcebergHadoopExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, + String comment) { + super(catalogId, name, comment); + props = PropertyConverter.convertToMetaProperties(props); + String warehouse = props.get(CatalogProperties.WAREHOUSE_LOCATION); + Preconditions.checkArgument(StringUtils.isNotEmpty(warehouse), + "Cannot initialize Iceberg HadoopCatalog because 'warehouse' must not be null or empty"); + String nameService = StringUtils.substringBetween(warehouse, HdfsResource.HDFS_FILE_PREFIX, "/"); + if (StringUtils.isEmpty(nameService)) { + throw new IllegalArgumentException("Unrecognized 'warehouse' location format" + + " because name service is required."); + } + catalogProperty = new CatalogProperty(resource, props); + catalogProperty.addProperty(HdfsResource.HADOOP_FS_NAME, HdfsResource.HDFS_FILE_PREFIX + nameService); + } + + @Override + protected void initLocalObjectsImpl() { + icebergCatalogType = ICEBERG_HADOOP; + HadoopCatalog hadoopCatalog = new HadoopCatalog(); + hadoopCatalog.setConf(getConfiguration()); + // initialize hive catalog + Map<String, String> catalogProperties = new HashMap<>(); + String warehouse = catalogProperty.getProperties().get(CatalogProperties.WAREHOUSE_LOCATION); + catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); + hadoopCatalog.initialize(icebergCatalogType, catalogProperties); + catalog = hadoopCatalog; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 892ffbeca5..196e68bad3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -68,6 +68,7 @@ import org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergHadoopExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; @@ -205,6 +206,7 @@ public class GsonUtils { .registerSubtype(IcebergGlueExternalCatalog.class, IcebergGlueExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 7293982ebb..c89c606b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; @@ -111,6 +112,7 @@ public class IcebergScanNode extends FileQueryScanNode { case IcebergExternalCatalog.ICEBERG_REST: case IcebergExternalCatalog.ICEBERG_DLF: case IcebergExternalCatalog.ICEBERG_GLUE: + case IcebergExternalCatalog.ICEBERG_HADOOP: source = new IcebergApiSource((IcebergExternalTable) table, desc, columnNameToRange); break; default: @@ -194,7 +196,7 @@ public class IcebergScanNode extends FileQueryScanNode { // Min split size is DEFAULT_SPLIT_SIZE(128MB). long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); HashSet<String> partitionPathSet = new HashSet<>(); - String dataPath = icebergTable.location() + icebergTable.properties() + String dataPath = normalizeLocation(icebergTable.location()) + icebergTable.properties() .getOrDefault(TableProperties.WRITE_DATA_LOCATION, DEFAULT_DATA_PATH); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); @@ -202,7 +204,7 @@ public class IcebergScanNode extends FileQueryScanNode { try (CloseableIterable<CombinedScanTask> combinedScanTasks = TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { - String dataFilePath = splitTask.file().path().toString(); + String dataFilePath = normalizeLocation(splitTask.file().path().toString()); // Counts the number of partitions read if (isPartitionedTable) { @@ -311,8 +313,21 @@ public class IcebergScanNode extends FileQueryScanNode { @Override public TFileType getLocationType(String location) throws UserException { - return getTFileType(location).orElseThrow(() -> - new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name())); + final String fLocation = normalizeLocation(location); + return getTFileType(fLocation).orElseThrow(() -> + new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name())); + } + + private String normalizeLocation(String location) { + Map<String, String> props = source.getCatalog().getProperties(); + String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE); + if (icebergCatalogType.equalsIgnoreCase("hadoop")) { + if (!location.startsWith(HdfsResource.HDFS_PREFIX)) { + String fsName = props.get(HdfsResource.HADOOP_FS_NAME); + location = fsName + location; + } + } + return location; } @Override diff --git a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out new file mode 100644 index 0000000000..fa1a58f6f1 --- /dev/null +++ b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out @@ -0,0 +1,17 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +2879562 + +-- !q02 -- +1 +3 +5 +6 +7 +8 +11 + +-- !q03 -- +1 Customer#000000001 j5JsirBM9P MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING +3 Customer#000000003 fkRGN8n ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE +5 Customer#000000005 hwBtxkoBF qSW4KrI CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD diff --git a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy new file mode 100644 index 0000000000..b35a799b28 --- /dev/null +++ b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_iceberg_hadoop_catalog", "p2,external,iceberg,external_remote,external_remote_iceberg") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String iceberg_catalog_name = "test_external_iceberg_catalog_hadoop" + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHdfsPort = context.config.otherConfigs.get("extHdfsPort") + sql """drop catalog if exists ${iceberg_catalog_name};""" + sql """ + create catalog if not exists ${iceberg_catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hadoop', + 'warehouse' = 'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog' + ); + """ + + sql """switch ${iceberg_catalog_name};""" + def q01 = { + qt_q01 """ select count(*) from iceberg_hadoop_catalog """ + qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by c_custkey order by c_custkey limit 7 """ + qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey limit 3 """ + } + + sql """ use `multi_catalog`; """ + q01() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org