This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new b1b2697cc7 [fix](iceberg) fix iceberg catalog (#16372) b1b2697cc7 is described below commit b1b2697cc7c832071930ed721c9dadb7be396dab Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Sun Feb 5 13:15:28 2023 +0800 [fix](iceberg) fix iceberg catalog (#16372) 1. Fix iceberg catalog access s3 2. Fix iceberg catalog partition table query 3. Fix persistence --- be/src/vec/exec/format/generic_reader.h | 2 +- be/src/vec/exec/format/table/iceberg_reader.cpp | 4 ++ be/src/vec/exec/format/table/iceberg_reader.h | 2 + be/src/vec/exec/scan/scanner_scheduler.cpp | 4 +- docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 43 ++++++++++++++++++++++ .../datasource/iceberg/IcebergExternalCatalog.java | 22 ++++++++--- .../iceberg/IcebergExternalCatalogFactory.java | 4 +- .../iceberg/IcebergHMSExternalCatalog.java | 6 +-- .../iceberg/IcebergRestExternalCatalog.java | 29 +++++++++++---- .../org/apache/doris/persist/gson/GsonUtils.java | 2 + 10 files changed, 98 insertions(+), 20 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 9f4cfd00ee..5abf064813 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -47,7 +47,7 @@ public: /// If the underlying FileReader has filled the partition&missing columns, /// The FileScanner does not need to fill - bool fill_all_columns() const { return _fill_all_columns; } + virtual bool fill_all_columns() const { return _fill_all_columns; } /// Tell the underlying FileReader the partition&missing columns, /// and the FileReader determine to fill columns or not. diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index adc31e605b..0035323432 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -114,6 +114,10 @@ Status IcebergTableReader::set_fill_columns( return _file_format_reader->set_fill_columns(partition_columns, missing_columns); } +bool IcebergTableReader::fill_all_columns() const { + return _file_format_reader->fill_all_columns(); +}; + Status IcebergTableReader::get_columns( std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 93a6963c80..f9e480f28b 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -53,6 +53,8 @@ public: partition_columns, const std::unordered_map<std::string, VExprContext*>& missing_columns) override; + bool fill_all_columns() const override; + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index fe7356d0e8..32e6a74da3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -242,7 +242,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext auto block = ctx->get_free_block(&get_free_block); status = scanner->get_block(state, block, &eos); - VLOG_ROW << "VOlapScanNode input rows: " << block->rows() << ", eos: " << eos; + VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << eos; // The VFileScanner for external table may try to open not exist files, // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. @@ -250,7 +250,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext if (!status.ok() && (typeid(*scanner) != typeid(doris::vectorized::VFileScanner) || (typeid(*scanner) == typeid(doris::vectorized::VFileScanner) && !status.is<ErrorCode::NOT_FOUND>()))) { - LOG(WARNING) << "Scan thread read VOlapScanner failed: " << status.to_string(); + LOG(WARNING) << "Scan thread read VScanner failed: " << status.to_string(); // Add block ptr in blocks, prevent mem leak in read failed blocks.push_back(block); break; diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index d27a176e96..3424eb8c29 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -35,6 +35,8 @@ under the License. ## 创建 Catalog +### 基于Hive Metastore创建Catalog + 和 Hive Catalog 基本一致,这里仅给出简单示例。其他示例可参阅 [Hive Catalog](./hive)。 ```sql @@ -50,6 +52,47 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` +### 基于Iceberg API创建Catalog + +使用Iceberg API访问元数据的方式,支持Hive、REST、Glue等服务作为Iceberg的Catalog。 + +- Hive Metastore作为元数据服务 + +```sql +CREATE CATALOG iceberg PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://172.21.0.1:7004', + 'hadoop.username' = 'hive', + 'dfs.nameservices'='your-nameservice', + 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', + 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007', + 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007', + 'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider' +); +``` + +- REST Catalog作为元数据服务 + +该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。 + +```sql +CREATE CATALOG iceberg PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://172.21.0.1:8181', +); +``` + +若数据存放在S3上,properties中可以使用以下参数 + +```sql +"AWS_ACCESS_KEY" = "username" +"AWS_SECRET_KEY" = "password" +"AWS_REGION" = "region-name" +"AWS_ENDPOINT" = "http://endpoint-uri" +``` + ## 列类型映射 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive) 中 **列类型映射** 一节。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index c396ca268f..4b9f2a2e15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -54,13 +55,12 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; - protected final String icebergCatalogType; + protected String icebergCatalogType; protected Catalog catalog; protected SupportsNamespaces nsCatalog; - public IcebergExternalCatalog(long catalogId, String name, String type) { + public IcebergExternalCatalog(long catalogId, String name) { super(catalogId, name); - this.icebergCatalogType = type; } @Override @@ -152,7 +152,18 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { } } + public Catalog getCatalog() { + makeSureInitialized(); + return catalog; + } + + public SupportsNamespaces getNsCatalog() { + makeSureInitialized(); + return nsCatalog; + } + public String getIcebergCatalogType() { + makeSureInitialized(); return icebergCatalogType; } @@ -180,12 +191,13 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { @Override public List<Column> getSchema(String dbName, String tblName) { makeSureInitialized(); - List<Types.NestedField> columns = getIcebergTable(dbName, tblName).schema().columns(); + Schema schema = getIcebergTable(dbName, tblName).schema(); + List<Types.NestedField> columns = schema.columns(); List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { tmpSchema.add(new Column(field.name(), icebergTypeToDorisType(field.type()), true, null, - true, field.doc(), true, -1)); + true, field.doc(), true, schema.caseInsensitiveFindField(field.name()).fieldId())); } return tmpSchema; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java index 62ad0c8729..4f97c3cd59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java @@ -32,9 +32,9 @@ public class IcebergExternalCatalogFactory { } switch (catalogType) { case IcebergExternalCatalog.ICEBERG_REST: - return new IcebergRestExternalCatalog(catalogId, name, resource, catalogType, props); + return new IcebergRestExternalCatalog(catalogId, name, resource, props); case IcebergExternalCatalog.ICEBERG_HMS: - return new IcebergHMSExternalCatalog(catalogId, name, resource, catalogType, props); + return new IcebergHMSExternalCatalog(catalogId, name, resource, props); default: throw new DdlException("Unknown " + IcebergExternalCatalog.ICEBERG_CATALOG_TYPE + " value: " + catalogType); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index f969ae7085..97748c608b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -28,14 +28,14 @@ import java.util.Map; public class IcebergHMSExternalCatalog extends IcebergExternalCatalog { - public IcebergHMSExternalCatalog(long catalogId, String name, String resource, String catalogType, - Map<String, String> props) { - super(catalogId, name, catalogType); + public IcebergHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) { + super(catalogId, name); catalogProperty = new CatalogProperty(resource, props); } @Override protected void initLocalObjectsImpl() { + icebergCatalogType = ICEBERG_HMS; HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); hiveCatalog.setConf(getConfiguration()); // initialize hive catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index f3f240b661..c0343f244c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.catalog.S3Resource; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider; @@ -30,24 +31,38 @@ import java.util.Map; public class IcebergRestExternalCatalog extends IcebergExternalCatalog { - public IcebergRestExternalCatalog(long catalogId, String name, String resource, String catalogType, - Map<String, String> props) { - super(catalogId, name, catalogType); + public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) { + super(catalogId, name); catalogProperty = new CatalogProperty(resource, props); } @Override protected void initLocalObjectsImpl() { + icebergCatalogType = ICEBERG_REST; Map<String, String> restProperties = new HashMap<>(); String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, ""); restProperties.put(CatalogProperties.URI, restUri); RESTCatalog restCatalog = new RESTCatalog(); - String credentials = catalogProperty.getProperties() - .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName()); - Configuration conf = getConfiguration(); - conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials); + Configuration conf = replaceS3Properties(getConfiguration()); restCatalog.setConf(conf); restCatalog.initialize(icebergCatalogType, restProperties); catalog = restCatalog; } + + private Configuration replaceS3Properties(Configuration conf) { + Map<String, String> catalogProperties = catalogProperty.getProperties(); + String credentials = catalogProperties + .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName()); + conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials); + String usePahStyle = catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true"); + // Set path style + conf.set(S3Resource.USE_PATH_STYLE, usePahStyle); + conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle); + // Get AWS client retry limit + conf.set(Constants.RETRY_LIMIT, catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1")); + conf.set(Constants.RETRY_THROTTLE_LIMIT, catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1")); + conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, + catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1")); + return conf; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index f613d6f986..fef1ed14dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -52,6 +52,7 @@ import org.apache.doris.datasource.EsExternalCatalog; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.JdbcExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; @@ -172,6 +173,7 @@ public class GsonUtils { .registerSubtype(HMSExternalCatalog.class, HMSExternalCatalog.class.getSimpleName()) .registerSubtype(EsExternalCatalog.class, EsExternalCatalog.class.getSimpleName()) .registerSubtype(JdbcExternalCatalog.class, JdbcExternalCatalog.class.getSimpleName()) + .registerSubtype(IcebergExternalCatalog.class, IcebergExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergHMSExternalCatalog.class, IcebergHMSExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org