This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new 3506ebf837 [data lake]Support hdfs ha for Iceberg table. (#11002)
3506ebf837 is described below

commit 3506ebf83782c6e3dc06e17ed5666f2eae93b926
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Jul 20 19:03:58 2022 +0800

    [data lake]Support hdfs ha for Iceberg table. (#11002)
    
    * Support Iceberg on HDFS with HA mode enabled.
---
 .../org/apache/doris/catalog/IcebergProperty.java   | 21 +++++++++++++++++++++
 .../java/org/apache/doris/catalog/IcebergTable.java |  7 +++----
 .../doris/external/iceberg/IcebergCatalogMgr.java   | 11 +++++++++++
 3 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java
index ffc933eb7a..882524a575 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java
@@ -17,6 +17,9 @@
 
 package org.apache.doris.catalog;
 
+import com.google.common.collect.Maps;
+
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -27,6 +30,7 @@ public class IcebergProperty {
     public static final String ICEBERG_TABLE = "iceberg.table";
     public static final String ICEBERG_HIVE_METASTORE_URIS = 
"iceberg.hive.metastore.uris";
     public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type";
+    public static final String ICEBERG_HDFS_PREFIX = "dfs";
 
     private boolean exist;
 
@@ -34,6 +38,17 @@ public class IcebergProperty {
     private String table;
     private String hiveMetastoreUris;
     private String catalogType;
+    private Map<String, String> dfsProperties = Maps.newHashMap();
+
+    private void initDfsProperties(Map<String, String> properties) {
+        Iterator<Map.Entry<String, String>> iterator = 
properties.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+            if (entry.getKey().startsWith(ICEBERG_HDFS_PREFIX)) {
+                dfsProperties.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
 
     public IcebergProperty(Map<String, String> properties) {
         if (properties != null && !properties.isEmpty()) {
@@ -42,6 +57,7 @@ public class IcebergProperty {
             this.table = properties.get(ICEBERG_TABLE);
             this.hiveMetastoreUris = 
properties.get(ICEBERG_HIVE_METASTORE_URIS);
             this.catalogType = properties.get(ICEBERG_CATALOG_TYPE);
+            initDfsProperties(properties);
         } else {
             this.exist = false;
         }
@@ -54,6 +70,7 @@ public class IcebergProperty {
         this.table = otherProperty.table;
         this.hiveMetastoreUris = otherProperty.hiveMetastoreUris;
         this.catalogType = otherProperty.catalogType;
+        this.dfsProperties = otherProperty.dfsProperties;
     }
 
     public boolean isExist() {
@@ -83,4 +100,8 @@ public class IcebergProperty {
     public void setTable(String table) {
         this.table = table;
     }
+
+    public Map<String, String> getDfsProperties() {
+        return dfsProperties;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
index 7fbabd38b3..ba0463e5f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
@@ -88,10 +88,9 @@ public class IcebergTable extends Table {
         this.icebergDb = icebergProperty.getDatabase();
         this.icebergTbl = icebergProperty.getTable();
 
-        icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS,
-                icebergProperty.getHiveMetastoreUris());
-        icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE,
-                icebergProperty.getCatalogType());
+        icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, 
icebergProperty.getHiveMetastoreUris());
+        icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, 
icebergProperty.getCatalogType());
+        icebergProperties.putAll(icebergProperty.getDfsProperties());
         this.icebergTable = icebergTable;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java
index bbc6b26922..503947e362 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -128,6 +129,16 @@ public class IcebergCatalogMgr {
             copiedProps.remove(ICEBERG_TABLE);
         }
 
+        if (!copiedProps.isEmpty()) {
+            Iterator<Map.Entry<String, String>> iter = 
copiedProps.entrySet().iterator();
+            while (iter.hasNext()) {
+                Map.Entry<String, String> entry = iter.next();
+                if 
(entry.getKey().startsWith(IcebergProperty.ICEBERG_HDFS_PREFIX)) {
+                    iter.remove();
+                }
+            }
+        }
+
         if (!copiedProps.isEmpty()) {
             throw new DdlException("Unknown table properties: " + 
copiedProps.toString());
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to