This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch dev-1.1.2 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push: new 3506ebf837 [data lake]Support hdfs ha for Iceberg table. (#11002) 3506ebf837 is described below commit 3506ebf83782c6e3dc06e17ed5666f2eae93b926 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Jul 20 19:03:58 2022 +0800 [data lake]Support hdfs ha for Iceberg table. (#11002) * Support Iceberg on HDFS with HA mode enabled. --- .../org/apache/doris/catalog/IcebergProperty.java | 21 +++++++++++++++++++++ .../java/org/apache/doris/catalog/IcebergTable.java | 7 +++---- .../doris/external/iceberg/IcebergCatalogMgr.java | 11 +++++++++++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java index ffc933eb7a..882524a575 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergProperty.java @@ -17,6 +17,9 @@ package org.apache.doris.catalog; +import com.google.common.collect.Maps; + +import java.util.Iterator; import java.util.Map; /** @@ -27,6 +30,7 @@ public class IcebergProperty { public static final String ICEBERG_TABLE = "iceberg.table"; public static final String ICEBERG_HIVE_METASTORE_URIS = "iceberg.hive.metastore.uris"; public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; + public static final String ICEBERG_HDFS_PREFIX = "dfs"; private boolean exist; @@ -34,6 +38,17 @@ public class IcebergProperty { private String table; private String hiveMetastoreUris; private String catalogType; + private Map<String, String> dfsProperties = Maps.newHashMap(); + + private void initDfsProperties(Map<String, String> properties) { + Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, String> entry = iterator.next(); + if (entry.getKey().startsWith(ICEBERG_HDFS_PREFIX)) { + dfsProperties.put(entry.getKey(), entry.getValue()); + } + } + } public IcebergProperty(Map<String, String> properties) { if (properties != null && !properties.isEmpty()) { @@ -42,6 +57,7 @@ public class IcebergProperty { this.table = properties.get(ICEBERG_TABLE); this.hiveMetastoreUris = properties.get(ICEBERG_HIVE_METASTORE_URIS); this.catalogType = properties.get(ICEBERG_CATALOG_TYPE); + initDfsProperties(properties); } else { this.exist = false; } @@ -54,6 +70,7 @@ public class IcebergProperty { this.table = otherProperty.table; this.hiveMetastoreUris = otherProperty.hiveMetastoreUris; this.catalogType = otherProperty.catalogType; + this.dfsProperties = otherProperty.dfsProperties; } public boolean isExist() { @@ -83,4 +100,8 @@ public class IcebergProperty { public void setTable(String table) { this.table = table; } + + public Map<String, String> getDfsProperties() { + return dfsProperties; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java index 7fbabd38b3..ba0463e5f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java @@ -88,10 +88,9 @@ public class IcebergTable extends Table { this.icebergDb = icebergProperty.getDatabase(); this.icebergTbl = icebergProperty.getTable(); - icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, - icebergProperty.getHiveMetastoreUris()); - icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, - icebergProperty.getCatalogType()); + icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, icebergProperty.getHiveMetastoreUris()); + icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, icebergProperty.getCatalogType()); + icebergProperties.putAll(icebergProperty.getDfsProperties()); this.icebergTable = icebergTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java index bbc6b26922..503947e362 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java @@ -35,6 +35,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -128,6 +129,16 @@ public class IcebergCatalogMgr { copiedProps.remove(ICEBERG_TABLE); } + if (!copiedProps.isEmpty()) { + Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry<String, String> entry = iter.next(); + if (entry.getKey().startsWith(IcebergProperty.ICEBERG_HDFS_PREFIX)) { + iter.remove(); + } + } + } + if (!copiedProps.isEmpty()) { throw new DdlException("Unknown table properties: " + copiedProps.toString()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org