This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b1b2697cc7 [fix](iceberg) fix iceberg catalog (#16372)
b1b2697cc7 is described below

commit b1b2697cc7c832071930ed721c9dadb7be396dab
Author: slothever <18522955+w...@users.noreply.github.com>
AuthorDate: Sun Feb 5 13:15:28 2023 +0800

    [fix](iceberg) fix iceberg catalog (#16372)
    
    1. Fix iceberg catalog access s3
    2. Fix iceberg catalog partition table query
    3. Fix persistence
---
 be/src/vec/exec/format/generic_reader.h            |  2 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  4 ++
 be/src/vec/exec/format/table/iceberg_reader.h      |  2 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  4 +-
 docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md | 43 ++++++++++++++++++++++
 .../datasource/iceberg/IcebergExternalCatalog.java | 22 ++++++++---
 .../iceberg/IcebergExternalCatalogFactory.java     |  4 +-
 .../iceberg/IcebergHMSExternalCatalog.java         |  6 +--
 .../iceberg/IcebergRestExternalCatalog.java        | 29 +++++++++++----
 .../org/apache/doris/persist/gson/GsonUtils.java   |  2 +
 10 files changed, 98 insertions(+), 20 deletions(-)

diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 9f4cfd00ee..5abf064813 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -47,7 +47,7 @@ public:
 
     /// If the underlying FileReader has filled the partition&missing columns,
     /// The FileScanner does not need to fill
-    bool fill_all_columns() const { return _fill_all_columns; }
+    virtual bool fill_all_columns() const { return _fill_all_columns; }
 
     /// Tell the underlying FileReader the partition&missing columns,
     /// and the FileReader determine to fill columns or not.
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index adc31e605b..0035323432 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -114,6 +114,10 @@ Status IcebergTableReader::set_fill_columns(
     return _file_format_reader->set_fill_columns(partition_columns, 
missing_columns);
 }
 
+bool IcebergTableReader::fill_all_columns() const {
+    return _file_format_reader->fill_all_columns();
+};
+
 Status IcebergTableReader::get_columns(
         std::unordered_map<std::string, TypeDescriptor>* name_to_type,
         std::unordered_set<std::string>* missing_cols) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 93a6963c80..f9e480f28b 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -53,6 +53,8 @@ public:
                     partition_columns,
             const std::unordered_map<std::string, VExprContext*>& 
missing_columns) override;
 
+    bool fill_all_columns() const override;
+
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index fe7356d0e8..32e6a74da3 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -242,7 +242,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
 
         auto block = ctx->get_free_block(&get_free_block);
         status = scanner->get_block(state, block, &eos);
-        VLOG_ROW << "VOlapScanNode input rows: " << block->rows() << ", eos: " 
<< eos;
+        VLOG_ROW << "VScanNode input rows: " << block->rows() << ", eos: " << 
eos;
         // The VFileScanner for external table may try to open not exist files,
         // Because FE file cache for external table may out of date.
         // So, NOT_FOUND for VFileScanner is not a fail case.
@@ -250,7 +250,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
         if (!status.ok() && (typeid(*scanner) != 
typeid(doris::vectorized::VFileScanner) ||
                              (typeid(*scanner) == 
typeid(doris::vectorized::VFileScanner) &&
                               !status.is<ErrorCode::NOT_FOUND>()))) {
-            LOG(WARNING) << "Scan thread read VOlapScanner failed: " << 
status.to_string();
+            LOG(WARNING) << "Scan thread read VScanner failed: " << 
status.to_string();
             // Add block ptr in blocks, prevent mem leak in read failed
             blocks.push_back(block);
             break;
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
index d27a176e96..3424eb8c29 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md
@@ -35,6 +35,8 @@ under the License.
 
 ## 创建 Catalog
 
+### 基于Hive Metastore创建Catalog
+
 和 Hive Catalog 基本一致,这里仅给出简单示例。其他示例可参阅 [Hive Catalog](./hive)。
 
 ```sql
@@ -50,6 +52,47 @@ CREATE CATALOG iceberg PROPERTIES (
 );
 ```
 
+### 基于Iceberg API创建Catalog
+
+使用Iceberg API访问元数据的方式,支持Hive、REST、Glue等服务作为Iceberg的Catalog。
+
+- Hive Metastore作为元数据服务
+
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+    'type'='iceberg',
+    'iceberg.catalog.type'='hms',
+    'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
+    'hadoop.username' = 'hive',
+    'dfs.nameservices'='your-nameservice',
+    'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
+    'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
+    'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
+    
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
+```
+
+- REST Catalog作为元数据服务
+
+该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。
+
+```sql
+CREATE CATALOG iceberg PROPERTIES (
+    'type'='iceberg',
+    'iceberg.catalog.type'='rest',
+    'uri' = 'http://172.21.0.1:8181',
+);
+```
+
+若数据存放在S3上,properties中可以使用以下参数
+
+```sql
+"AWS_ACCESS_KEY" = "username"
+"AWS_SECRET_KEY" = "password"
+"AWS_REGION" = "region-name"
+"AWS_ENDPOINT" = "http://endpoint-uri";
+```
+
 ## 列类型映射
 
 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive) 中 **列类型映射** 一节。
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index c396ca268f..4b9f2a2e15 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -54,13 +55,12 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
     public static final String ICEBERG_REST = "rest";
     public static final String ICEBERG_HMS = "hms";
-    protected final String icebergCatalogType;
+    protected String icebergCatalogType;
     protected Catalog catalog;
     protected SupportsNamespaces nsCatalog;
 
-    public IcebergExternalCatalog(long catalogId, String name, String type) {
+    public IcebergExternalCatalog(long catalogId, String name) {
         super(catalogId, name);
-        this.icebergCatalogType = type;
     }
 
     @Override
@@ -152,7 +152,18 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         }
     }
 
+    public Catalog getCatalog() {
+        makeSureInitialized();
+        return catalog;
+    }
+
+    public SupportsNamespaces getNsCatalog() {
+        makeSureInitialized();
+        return nsCatalog;
+    }
+
     public String getIcebergCatalogType() {
+        makeSureInitialized();
         return icebergCatalogType;
     }
 
@@ -180,12 +191,13 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     @Override
     public List<Column> getSchema(String dbName, String tblName) {
         makeSureInitialized();
-        List<Types.NestedField> columns = getIcebergTable(dbName, 
tblName).schema().columns();
+        Schema schema = getIcebergTable(dbName, tblName).schema();
+        List<Types.NestedField> columns = schema.columns();
         List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
         for (Types.NestedField field : columns) {
             tmpSchema.add(new Column(field.name(),
                     icebergTypeToDorisType(field.type()), true, null,
-                    true, field.doc(), true, -1));
+                    true, field.doc(), true, 
schema.caseInsensitiveFindField(field.name()).fieldId()));
         }
         return tmpSchema;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
index 62ad0c8729..4f97c3cd59 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalogFactory.java
@@ -32,9 +32,9 @@ public class IcebergExternalCatalogFactory {
         }
         switch (catalogType) {
             case IcebergExternalCatalog.ICEBERG_REST:
-                return new IcebergRestExternalCatalog(catalogId, name, 
resource, catalogType, props);
+                return new IcebergRestExternalCatalog(catalogId, name, 
resource, props);
             case IcebergExternalCatalog.ICEBERG_HMS:
-                return new IcebergHMSExternalCatalog(catalogId, name, 
resource, catalogType, props);
+                return new IcebergHMSExternalCatalog(catalogId, name, 
resource, props);
             default:
                 throw new DdlException("Unknown " + 
IcebergExternalCatalog.ICEBERG_CATALOG_TYPE
                     + " value: " + catalogType);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index f969ae7085..97748c608b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -28,14 +28,14 @@ import java.util.Map;
 
 public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
 
-    public IcebergHMSExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
-                                     Map<String, String> props) {
-        super(catalogId, name, catalogType);
+    public IcebergHMSExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props) {
+        super(catalogId, name);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
+        icebergCatalogType = ICEBERG_HMS;
         HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
         hiveCatalog.setConf(getConfiguration());
         // initialize hive catalog
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index f3f240b661..c0343f244c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
 
@@ -30,24 +31,38 @@ import java.util.Map;
 
 public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
 
-    public  IcebergRestExternalCatalog(long catalogId, String name, String 
resource, String catalogType,
-                                       Map<String, String> props) {
-        super(catalogId, name, catalogType);
+    public  IcebergRestExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props) {
+        super(catalogId, name);
         catalogProperty = new CatalogProperty(resource, props);
     }
 
     @Override
     protected void initLocalObjectsImpl() {
+        icebergCatalogType = ICEBERG_REST;
         Map<String, String> restProperties = new HashMap<>();
         String restUri = 
catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
         restProperties.put(CatalogProperties.URI, restUri);
         RESTCatalog restCatalog = new RESTCatalog();
-        String credentials = catalogProperty.getProperties()
-                .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, 
DataLakeAWSCredentialsProvider.class.getName());
-        Configuration conf = getConfiguration();
-        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+        Configuration conf = replaceS3Properties(getConfiguration());
         restCatalog.setConf(conf);
         restCatalog.initialize(icebergCatalogType, restProperties);
         catalog = restCatalog;
     }
+
+    private Configuration replaceS3Properties(Configuration conf) {
+        Map<String, String> catalogProperties = 
catalogProperty.getProperties();
+        String credentials = catalogProperties
+                .getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, 
DataLakeAWSCredentialsProvider.class.getName());
+        conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
+        String usePahStyle = 
catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true");
+        // Set path style
+        conf.set(S3Resource.USE_PATH_STYLE, usePahStyle);
+        conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle);
+        // Get AWS client retry limit
+        conf.set(Constants.RETRY_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1"));
+        conf.set(Constants.RETRY_THROTTLE_LIMIT, 
catalogProperties.getOrDefault(Constants.RETRY_THROTTLE_LIMIT, "1"));
+        conf.set(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT,
+                
catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1"));
+        return conf;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index f613d6f986..fef1ed14dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -52,6 +52,7 @@ import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.datasource.JdbcExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergHMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
 import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
@@ -172,6 +173,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalCatalog.class, 
HMSExternalCatalog.class.getSimpleName())
             .registerSubtype(EsExternalCatalog.class, 
EsExternalCatalog.class.getSimpleName())
             .registerSubtype(JdbcExternalCatalog.class, 
JdbcExternalCatalog.class.getSimpleName())
+            .registerSubtype(IcebergExternalCatalog.class, 
IcebergExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergHMSExternalCatalog.class, 
IcebergHMSExternalCatalog.class.getSimpleName())
             .registerSubtype(IcebergRestExternalCatalog.class, 
IcebergRestExternalCatalog.class.getSimpleName());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to