This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-unstable
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3da8dfb658365bbcf3e4a55545ea88153842eec5
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Nov 8 14:02:41 2022 +0800

    [feature](multi-catalog) Support data on s3-compatible oss and support 
aliyun DLF (#13994)
    
    Support Aliyun DLF
    Support data on s3-compatible object storage, such as aliyun oss.
    Refactor some interface of catalog, to make it more tidy.
    Fix bug that the default text format field delimiter of hive should be \x01
    Add a new class PooledHiveMetaStoreClient to wrap the IMetaStoreClient.
---
 .asf.yaml                                          |   1 +
 build.sh                                           |  17 ++-
 .../docs/ecosystem/external-table/multi-catalog.md |  68 +++++++++-
 .../Create/CREATE-TABLE-AS-SELECT.md               |   6 +
 .../docs/ecosystem/external-table/multi-catalog.md |  71 +++++++++-
 .../Create/CREATE-TABLE-AS-SELECT.md               |   6 +
 .../org/apache/doris/analysis/CreateTableStmt.java |   2 +-
 .../doris/catalog/HiveMetaStoreClientHelper.java   |  92 ++++++-------
 .../doris/catalog/external/HMSExternalTable.java   |  46 ++-----
 .../java/org/apache/doris/common/FeConstants.java  |  10 ++
 .../org/apache/doris/datasource/CatalogMgr.java    |   2 +-
 .../apache/doris/datasource/EsExternalCatalog.java |  31 +----
 .../apache/doris/datasource/ExternalCatalog.java   |  48 ++++++-
 .../doris/datasource/HMSExternalCatalog.java       |  83 +++---------
 .../apache/doris/datasource/InternalCatalog.java   |  35 +++--
 .../datasource/PooledHiveMetaStoreClient.java      | 146 +++++++++++++++++++++
 .../doris/planner/external/HiveScanProvider.java   |  49 +++++--
 .../doris/planner/external/QueryScanProvider.java  |   3 +
 18 files changed, 506 insertions(+), 210 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 532061b89d..e83a5f1421 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -58,6 +58,7 @@ github:
           - Build Broker
           - Build Documents
           - BE UT (Clang)
+          - ShellCheck
 
       required_pull_request_reviews:
         dismiss_stale_reviews: true
diff --git a/build.sh b/build.sh
index 65559f026e..aba8a7f565 100755
--- a/build.sh
+++ b/build.sh
@@ -50,9 +50,10 @@ Usage: $0 <options>
      -j                 build Backend parallel
 
   Environment variables:
-    USE_AVX2            If the CPU does not support AVX2 instruction set, 
please set USE_AVX2=0. Default is ON.
-    STRIP_DEBUG_INFO    If set STRIP_DEBUG_INFO=ON, the debug information in 
the compiled binaries will be stored separately in the 'be/lib/debug_info' 
directory. Default is OFF.
-    DISABLE_JAVA_UDF    If set DISABLE_JAVA_UDF=ON, we will do not build 
binary with java-udf. Default is OFF.
+    USE_AVX2                    If the CPU does not support AVX2 instruction 
set, please set USE_AVX2=0. Default is ON.
+    STRIP_DEBUG_INFO            If set STRIP_DEBUG_INFO=ON, the debug 
information in the compiled binaries will be stored separately in the 
'be/lib/debug_info' directory. Default is OFF.
+    DISABLE_JAVA_UDF            If set DISABLE_JAVA_UDF=ON, we will do not 
build binary with java-udf. Default is OFF.
+    DISABLE_JAVA_CHECK_STYLE    If set DISABLE_JAVA_CHECK_STYLE=ON, it will 
skip style check of java code in FE.
   Eg.
     $0                                      build all
     $0 --be                                 build Backend
@@ -289,6 +290,10 @@ if [[ -z "${DISABLE_JAVA_UDF}" ]]; then
     DISABLE_JAVA_UDF='OFF'
 fi
 
+if [[ -z "${DISABLE_JAVA_CHECK_STYLE}" ]]; then
+    DISABLE_JAVA_CHECK_STYLE='OFF'
+fi
+
 if [[ -z "${RECORD_COMPILER_SWITCHES}" ]]; then
     RECORD_COMPILER_SWITCHES='OFF'
 fi
@@ -463,7 +468,11 @@ if [[ "${FE_MODULES}" != '' ]]; then
     if [[ "${CLEAN}" -eq 1 ]]; then
         clean_fe
     fi
-    "${MVN_CMD}" package -pl ${FE_MODULES:+${FE_MODULES}} -DskipTests
+    if [[ "${DISABLE_JAVA_CHECK_STYLE}" = "ON" ]]; then
+        "${MVN_CMD}" package -pl ${FE_MODULES:+${FE_MODULES}} -DskipTests 
-Dcheckstyle.skip=true
+    else
+        "${MVN_CMD}" package -pl ${FE_MODULES:+${FE_MODULES}} -DskipTests
+    fi
     cd "${DORIS_HOME}"
 fi
 
diff --git a/docs/en/docs/ecosystem/external-table/multi-catalog.md 
b/docs/en/docs/ecosystem/external-table/multi-catalog.md
index 7cb081a6d7..905993f608 100644
--- a/docs/en/docs/ecosystem/external-table/multi-catalog.md
+++ b/docs/en/docs/ecosystem/external-table/multi-catalog.md
@@ -24,6 +24,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+<version since="1.2.0">
+
 # Multi-Catalog
 
 Multi-Catalog is a feature introduced in Doris 1.2.0, which aims to make it 
easier to interface with external data sources to enhance Doris' data lake 
analysis and federated data query capabilities.
@@ -292,7 +294,7 @@ mysql> select * from test;
 +------------+-------------+--------+-------+
 ```
 
-## Parameters that:
+#### Parameters:
 
 Parameter | Description
 ---|---
@@ -304,6 +306,68 @@ Parameter | Description
 **elasticsearch.nodes_discovery** | Whether or not to enable ES node 
discovery, the default is true. In network isolation, set this parameter to 
false. Only the specified node is connected.
 **elasticsearch.ssl** | Whether ES cluster enables https access mode, the 
current FE/BE implementation is to trust all
 
+### Connect Aliyun Data Lake Formation
+
+> [What is Data Lake 
Formation](https://www.alibabacloud.com/product/datalake-formation)
+
+1. Create hive-site.xml
+
+       Create hive-site.xml and put it in `fe/conf` and `be/conf`.
+       
+       ```
+       <?xml version="1.0"?>
+       <configuration>
+           <!--Set to use dlf client-->
+           <property>
+               <name>hive.metastore.type</name>
+               <value>dlf</value>
+           </property>
+           <property>
+               <name>dlf.catalog.endpoint</name>
+               <value>dlf-vpc.cn-beijing.aliyuncs.com</value>
+           </property>
+           <property>
+               <name>dlf.catalog.region</name>
+               <value>cn-beijing</value>
+           </property>
+           <property>
+               <name>dlf.catalog.proxyMode</name>
+               <value>DLF_ONLY</value>
+           </property>
+           <property>
+               <name>dlf.catalog.uid</name>
+               <value>20000000000000000</value>
+           </property>
+           <property>
+               <name>dlf.catalog.accessKeyId</name>
+               <value>XXXXXXXXXXXXXXX</value>
+           </property>
+           <property>
+               <name>dlf.catalog.accessKeySecret</name>
+               <value>XXXXXXXXXXXXXXXXX</value>
+           </property>
+       </configuration>
+       ```
+
+       * `dlf.catalog.endpoint`: DLF Endpoint. See: [Regions and endpoints of 
DLF](https://www.alibabacloud.com/help/en/data-lake-formation/latest/regions-and-endpoints)
+       * `dlf.catalog.region`: DLF Regio. See: [Regions and endpoints of 
DLF](https://www.alibabacloud.com/help/en/data-lake-formation/latest/regions-and-endpoints)
+       * `dlf.catalog.uid`: Ali Cloud Account ID. That is, the "cloud account 
ID" of the personal information in the upper right corner of the Alibaba Cloud 
console.  * `dlf.catalog.accessKeyId`: AccessKey. See: [Ali Could 
Console](https://ram.console.aliyun.com/manage/ak).
+       * `dlf.catalog.accessKeySecret`: SecretKey. See: [Ali Could 
Console](https://ram.console.aliyun.com/manage/ak).
+
+       Other configuration items are fixed values and do not need to be 
changed.
+
+2. Restart FE and create a catalog with the `CREATE CATALOG` statement.
+
+       ```
+       CREATE CATALOG dlf PROPERTIES (
+           "type"="hms",
+           "hive.metastore.uris" = "thrift://127.0.0.1:9083"
+       );
+       ```
+       
+       where `type` is fixed to `hms`. The value of `hive.metastore.uris` can 
be filled in at will, but it will not be used in practice. But it needs to be 
filled in the standard hive metastore thrift uri format.
+
+After that, the metadata under DLF can be accessed like a normal Hive 
MetaStore.
 
 ## Column Type Mapping
 
@@ -366,3 +430,5 @@ Metadata changes of external data sources, such as 
creating, dropping tables, ad
 Currently, users need to manually refresh metadata via the [REFRESH 
CATALOG](../../sql-manual/sql-reference/Utility-Statements/REFRESH-CATALOG.md) 
command.
 
 Automatic synchronization of metadata will be supported soon.
+
+</version>
\ No newline at end of file
diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
index 7498fdaa98..9589e6a33b 100644
--- 
a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
@@ -55,6 +55,12 @@ illustrate:
 - After a table is created, data is imported. If the import fails, the table 
is deleted
 - You can specify the key type. The default key type is `Duplicate Key`
 
+<version since='1.2'>
+
+- If the created source is an external table and the first column is of type 
String, the first column is automatically set to VARCHAR(65533). Because of 
Doris internal table, String column is not allowed as first column.
+
+</version>
+
 ### Example
 
 1. Using the field names in the SELECT statement
diff --git a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md 
b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
index 5b79d51821..0d46d212d6 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/multi-catalog.md
@@ -24,6 +24,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+<version since="1.2.0">
+
 # 多源数据目录
 
 多源数据目录(Multi-Catalog)是 Doris 1.2.0 
版本中推出的功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。
@@ -292,7 +294,7 @@ mysql> select * from test;
 +------------+-------------+--------+-------+
 ```
 
-## 参数说明:
+#### 参数说明:
 
 参数 | 说明
 ---|---
@@ -304,6 +306,71 @@ mysql> select * from test;
 **elasticsearch.nodes_discovery** | 是否开启 ES 节点发现,默认为 true,在网络隔离环境下设置为 
false,只连接指定节点
 **elasticsearch.ssl** | ES 是否开启 https 访问模式,目前在 fe/be 实现方式为信任所有
 
+### 连接阿里云 Data Lake Formation
+
+> [什么是 Data Lake Formation](https://www.aliyun.com/product/bigdata/dlf)
+
+1. 创建 hive-site.xml
+
+       创建 hive-site.xml 文件,并将其放置在 `fe/conf` 和 `be/conf` 目录下。
+       
+       ```
+       <?xml version="1.0"?>
+       <configuration>
+           <!--Set to use dlf client-->
+           <property>
+               <name>hive.metastore.type</name>
+               <value>dlf</value>
+           </property>
+           <property>
+               <name>dlf.catalog.endpoint</name>
+               <value>dlf-vpc.cn-beijing.aliyuncs.com</value>
+           </property>
+           <property>
+               <name>dlf.catalog.region</name>
+               <value>cn-beijing</value>
+           </property>
+           <property>
+               <name>dlf.catalog.proxyMode</name>
+               <value>DLF_ONLY</value>
+           </property>
+           <property>
+               <name>dlf.catalog.uid</name>
+               <value>20000000000000000</value>
+           </property>
+           <property>
+               <name>dlf.catalog.accessKeyId</name>
+               <value>XXXXXXXXXXXXXXX</value>
+           </property>
+           <property>
+               <name>dlf.catalog.accessKeySecret</name>
+               <value>XXXXXXXXXXXXXXXXX</value>
+           </property>
+       </configuration>
+       ```
+
+       * `dlf.catalog.endpoint`:DLF Endpoint,参阅:[DLF 
Region和Endpoint对照表](https://www.alibabacloud.com/help/zh/data-lake-formation/latest/regions-and-endpoints)
+       * `dlf.catalog.region`:DLF Region,参阅:[DLF 
Region和Endpoint对照表](https://www.alibabacloud.com/help/zh/data-lake-formation/latest/regions-and-endpoints)
+       * `dlf.catalog.uid`:阿里云账号。即阿里云控制台右上角个人信息的“云账号ID”。
+       * `dlf.catalog.accessKeyId`:AccessKey。可以在 
[阿里云控制台](https://ram.console.aliyun.com/manage/ak) 中创建和管理。
+       * `dlf.catalog.accessKeySecret`:SecretKey。可以在 
[阿里云控制台](https://ram.console.aliyun.com/manage/ak) 中创建和管理。
+
+       其他配置项为固定值,无需改动。
+
+2. 重启 FE,并通过 `CREATE CATALOG` 语句创建 catalog。
+
+       ```
+       CREATE CATALOG dlf PROPERTIES (
+           "type"="hms",
+           "hive.metastore.uris" = "thrift://127.0.0.1:9083"
+       );
+       ```
+       
+       其中 `type` 固定为 `hms`。 `hive.metastore.uris` 的值随意填写即可,实际不会使用。但需要按照标准 hive 
metastore thrift uri 格式填写。
+       
+之后,可以像正常的 Hive MetaStore 一样,访问 DLF 下的元数据。 
+
+
 ## 列类型映射
 
 用户创建 Catalog 后,Doris 会自动同步数据目录的数据库和表,针对不同的数据目录和数据表格式,Doris 会进行以下列映射关系。
@@ -365,3 +432,5 @@ Doris 的权限管理功能提供了对 Cataloig 层级的扩展,具体可参
 目前需要用户通过 [REFRESH 
CATALOG](../../sql-manual/sql-reference/Utility-Statements/REFRESH-CATALOG.md) 
命令手动刷新元数据。
 
 后续会支持元数据的自动同步。
+
+</version>
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
index 82938f9315..f2f3f3a834 100644
--- 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT.md
@@ -55,6 +55,12 @@ CREATE TABLE table_name [( column_name_list )]
 - 创建表成功后,会进行数据导入,如果导入失败,将会删除表
 - 可以自行指定 key type,默认为`Duplicate Key`
 
+<version since='1.2'>
+
+- 如果创建的来源为外部表,并且第一列为 String 类型,则会自动将第一列设置为 VARCHAR(65533)。因为 Doris 内部表,不允许 
String 列作为第一列。
+
+</version>
+
 ### Example
 
 1. 使用 select 语句中的字段名
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index bf3b06783c..7ba06377a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -332,7 +332,7 @@ public class CreateTableStmt extends DdlStmt {
                         }
                         keysColumnNames.add(columnDef.getName());
                     }
-                    // The OLAP table must has at least one short key and the 
float and double should not be short key.
+                    // The OLAP table must have at least one short key and the 
float and double should not be short key.
                     // So the float and double could not be the first column 
in OLAP table.
                     if (keysColumnNames.isEmpty()) {
                         throw new AnalysisException("The olap table first 
column could not be float, double, string"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index dbc402b10d..8440b5e6a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TExprOpcode;
 
 import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -91,7 +92,10 @@ import java.util.stream.Collectors;
  */
 public class HiveMetaStoreClientHelper {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetaStoreClientHelper.class);
+
+    public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
     public static final String HIVE_METASTORE_TYPE = "hive.metastore.type";
+    public static final String DLF_TYPE = "dlf";
 
     private static final Pattern digitPattern = Pattern.compile("(\\d+)");
 
@@ -142,16 +146,12 @@ public class HiveMetaStoreClientHelper {
     public static IMetaStoreClient getClient(String metaStoreUris) throws 
DdlException {
         HiveConf hiveConf = new HiveConf();
         hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris);
-        return getClient(hiveConf);
-    }
-
-    public static IMetaStoreClient getClient(HiveConf hiveConf) throws 
DdlException {
         hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
                 String.valueOf(Config.hive_metastore_client_timeout_second));
         IMetaStoreClient metaStoreClient = null;
         String type = hiveConf.get(HIVE_METASTORE_TYPE);
         try {
-            if (type.equalsIgnoreCase("dlf")) {
+            if ("dlf".equalsIgnoreCase(type)) {
                 // For aliyun DLF
                 metaStoreClient = new ProxyMetaStoreClient(hiveConf);
             } else {
@@ -164,33 +164,6 @@ public class HiveMetaStoreClientHelper {
         return metaStoreClient;
     }
 
-    /**
-     * Check to see if the specified table exists in the specified database.
-     *
-     * @param client HiveMetaStoreClient
-     * @param dbName the specified database name
-     * @param tblName the specified table name
-     * @return TRUE if specified.tableName exists, FALSE otherwise.
-     * @throws DdlException
-     */
-    public static boolean tableExists(IMetaStoreClient client, String dbName, 
String tblName) throws DdlException {
-        try {
-            return client.tableExists(dbName, tblName);
-        } catch (TException e) {
-            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
-            throw new DdlException("Connect hive metastore failed. Error: " + 
e.getMessage());
-        } finally {
-            dropClient(client);
-        }
-    }
-
-    /**
-     * close connection to meta store
-     */
-    public static void dropClient(IMetaStoreClient client) {
-        client.close();
-    }
-
     /**
      * Get data files of partitions in hive table, filter by partition 
predicate.
      *
@@ -401,6 +374,7 @@ public class HiveMetaStoreClientHelper {
 
     /**
      * Get hive table with dbName and tableName.
+     * Only for Hudi.
      *
      * @param dbName database name
      * @param tableName table name
@@ -408,6 +382,7 @@ public class HiveMetaStoreClientHelper {
      * @return HiveTable
      * @throws DdlException when get table from hive metastore failed.
      */
+    @Deprecated
     public static Table getTable(String dbName, String tableName, String 
metaStoreUris) throws DdlException {
         IMetaStoreClient client = getClient(metaStoreUris);
         Table table;
@@ -422,26 +397,6 @@ public class HiveMetaStoreClientHelper {
         return table;
     }
 
-    /**
-     * Get table schema.
-     *
-     * @param dbName Database name.
-     * @param tableName Table name.
-     * @param metaStoreUris Hive metastore uri.
-     */
-    public static List<FieldSchema> getSchema(String dbName, String tableName, 
String metaStoreUris)
-            throws DdlException {
-        IMetaStoreClient client = getClient(metaStoreUris);
-        try {
-            return client.getSchema(dbName, tableName);
-        } catch (TException e) {
-            LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
-            throw new DdlException("Connect hive metastore failed. Error: " + 
e.getMessage());
-        } finally {
-            client.close();
-        }
-    }
-
     /**
      * Convert Doris expr to Hive expr, only for partition column
      * @param tblName
@@ -927,6 +882,39 @@ public class HiveMetaStoreClientHelper {
         }
         return output.toString();
     }
+
+    public static Map<String, String> getPropertiesForDLF(String catalogName, 
HiveConf hiveConf) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get properties from hive-site.xml for catalog {}: {}", 
catalogName, hiveConf.getAllProperties());
+        }
+        Map<String, String> res = Maps.newHashMap();
+        String metastoreType = hiveConf.get(HIVE_METASTORE_TYPE);
+        if (!"dlf".equalsIgnoreCase(metastoreType)) {
+            return res;
+        }
+
+        // get following properties from hive-site.xml
+        // 1. region and endpoint. eg: cn-beijing
+        String region = hiveConf.get("dlf.catalog.region");
+        if (!Strings.isNullOrEmpty(region)) {
+            res.put(HiveTable.AWS_REGION, "oss-" + region);
+            res.put(HiveTable.S3_ENDPOINT, "http://oss-"; + region + 
".aliyuncs.com");
+        }
+
+        // 2. ak and sk
+        String ak = hiveConf.get("dlf.catalog.accessKeyId");
+        String sk = hiveConf.get("dlf.catalog.accessKeySecret");
+        if (!Strings.isNullOrEmpty(ak)) {
+            res.put(HiveTable.S3_AK, ak);
+        }
+        if (!Strings.isNullOrEmpty(sk)) {
+            res.put(HiveTable.S3_SK, sk);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get properties for oss in hive-site.xml for catalog {}: 
{}", catalogName, res);
+        }
+        return res;
+    }
 }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 3da7914ff8..3d8b2e5778 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InitTableLog;
+import org.apache.doris.datasource.PooledHiveMetaStoreClient;
 import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
@@ -32,16 +33,13 @@ import org.apache.doris.thrift.TTableType;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -177,16 +175,12 @@ public class HMSExternalTable extends ExternalTable {
         if (dlaType.equals(DLAType.UNKNOWN)) {
             schemaChanged = true;
         } else {
-            try {
-                for (FieldSchema field : 
HiveMetaStoreClientHelper.getSchema(dbName, name,
-                        ((HMSExternalCatalog) 
catalog).getHiveMetastoreUris())) {
-                    int columnId = (int) Env.getCurrentEnv().getNextId();
-                    tmpSchema.add(new Column(field.getName(),
-                            
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
-                            true, null, field.getComment(), true, null, 
columnId));
-                }
-            } catch (DdlException e) {
-                LOG.warn("Fail to get schema of hms table {}", name, e);
+            List<FieldSchema> schema = ((HMSExternalCatalog) 
catalog).getClient().getSchema(dbName, name);
+            for (FieldSchema field : schema) {
+                int columnId = (int) Env.getCurrentEnv().getNextId();
+                tmpSchema.add(new Column(field.getName(),
+                        
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+                        true, null, field.getComment(), true, null, columnId));
             }
             if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
                 schemaChanged = true;
@@ -219,17 +213,10 @@ public class HMSExternalTable extends ExternalTable {
         if (remoteTable == null) {
             synchronized (this) {
                 if (remoteTable == null) {
-                    String uri = ((HMSExternalCatalog) 
catalog).getHiveMetastoreUris();
-                    try {
-                        remoteTable = 
HiveMetaStoreClientHelper.getTable(dbName, name, uri);
-                    } catch (DdlException e) {
-                        LOG.warn("Fail to get remote hive table. db {}, table 
{}, uri {}", dbName, name, uri);
-                        throw new MetaNotFoundException(e);
-                    }
+                    remoteTable = ((HMSExternalCatalog) 
catalog).getClient().getTable(dbName, name);
                 }
             }
         }
-        // TODO: Refresh cached remoteTable
         return remoteTable;
     }
 
@@ -354,20 +341,11 @@ public class HMSExternalTable extends ExternalTable {
     }
 
     public List<Partition> getHivePartitions(ExprNodeGenericFuncDesc 
hivePartitionPredicate) throws DdlException {
-        List<Partition> hivePartitions = new ArrayList<>();
-        IMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
-        try {
-            client.listPartitionsByExpr(remoteTable.getDbName(), 
remoteTable.getTableName(),
-                    
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
-                    null, (short) -1, hivePartitions);
-        } catch (TException e) {
-            LOG.warn("Hive metastore thrift exception: {}", e);
-            throw new DdlException("Connect hive metastore failed: " + 
e.getMessage());
-        } finally {
-            client.close();
-        }
+        List<Partition> hivePartitions = Lists.newArrayList();
+        PooledHiveMetaStoreClient client = ((HMSExternalCatalog) 
catalog).getClient();
+        client.listPartitionsByExpr(remoteTable.getDbName(), 
remoteTable.getTableName(),
+                
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), 
hivePartitions);
         return hivePartitions;
     }
-
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 4cb2bd877b..6a9e07c804 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -64,4 +64,14 @@ public class FeConstants {
     public static String csv_with_names_and_types = "csv_with_names_and_types";
 
     public static String text = "text";
+
+    public static String FS_PREFIX_S3 = "s3";
+    public static String FS_PREFIX_S3A = "s3a";
+    public static String FS_PREFIX_S3N = "s3n";
+    public static String FS_PREFIX_OSS = "oss";
+    public static String FS_PREFIX_BOS = "bos";
+    public static String FS_PREFIX_COS = "cos";
+    public static String FS_PREFIX_OBS = "obs";
+    public static String FS_PREFIX_HDFS = "hdfs";
+    public static String FS_PREFIX_FILE = "file";
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index b92a515751..e1687b0f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -104,7 +104,7 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         if (catalog != null) {
             String catalogName = catalog.getName();
             if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
-                ((ExternalCatalog) catalog).setInitialized(false);
+                ((ExternalCatalog) catalog).setUninitialized();
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index e614c97006..29123395ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -23,10 +23,8 @@ import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.external.elasticsearch.EsRestClient;
 import org.apache.doris.external.elasticsearch.EsUtil;
-import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -118,33 +116,13 @@ public class EsExternalCatalog extends ExternalCatalog {
         }
     }
 
-    /**
-     * Datasource can't be init when creating because the external datasource 
may depend on third system.
-     * So you have to make sure the client of third system is initialized 
before any method was called.
-     */
     @Override
-    public synchronized void makeSureInitialized() {
-        if (!objectCreated) {
-            esRestClient = new EsRestClient(this.nodes, this.username, 
this.password, this.enableSsl);
-            objectCreated = true;
-        }
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(id, -1, -1);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init catalog %s 
operation to master.", name), e);
-                }
-                return;
-            }
-            init();
-        }
+    protected void initLocalObjectsImpl() {
+        esRestClient = new EsRestClient(this.nodes, this.username, 
this.password, this.enableSsl);
     }
 
-    private void init() {
+    @Override
+    protected void init() {
         InitCatalogLog initCatalogLog = new InitCatalogLog();
         this.esRestClient = new EsRestClient(this.nodes, this.username, 
this.password, this.enableSsl);
         initCatalogLog.setCatalogId(id);
@@ -161,7 +139,6 @@ public class EsExternalCatalog extends ExternalCatalog {
             idToDb.put(defaultDbId, db);
             initCatalogLog.addCreateDb(defaultDbId, DEFAULT_DB);
         }
-        initialized = true;
         Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index e6e9609ab3..ff82a4dda0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,14 +17,17 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
@@ -58,14 +61,14 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
     @SerializedName(value = "catalogProperty")
     protected CatalogProperty catalogProperty = new CatalogProperty();
     @SerializedName(value = "initialized")
-    protected boolean initialized = false;
+    private boolean initialized = false;
 
     // Cache of db name to db id
     @SerializedName(value = "idToDb")
     protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
     // db name does not contains "default_cluster"
     protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
-    protected boolean objectCreated = false;
+    private boolean objectCreated = false;
 
     /**
      * @return names of database in this catalog.
@@ -87,10 +90,45 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
      */
     public abstract boolean tableExist(SessionContext ctx, String dbName, 
String tblName);
 
-    public abstract void makeSureInitialized();
+    /**
+     * Catalog can't be init when creating because the external catalog may 
depend on third system.
+     * So you have to make sure the client of third system is initialized 
before any method was called.
+     */
+    public final synchronized void makeSureInitialized() {
+        initLocalObjects();
+        if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(id, -1, -1);
+                } catch (Exception e) {
+                    Util.logAndThrowRuntimeException(LOG,
+                            String.format("failed to forward init catalog %s 
operation to master.", name), e);
+                }
+                return;
+            }
+            init();
+            initialized = true;
+        }
+    }
+
+    protected final void initLocalObjects() {
+        if (!objectCreated) {
+            initLocalObjectsImpl();
+            objectCreated = true;
+        }
+    }
+
+    // init some local objects such as:
+    // hms client, read properties from hive-site.xml, es client
+    protected abstract void initLocalObjectsImpl();
+
+    // init schema related objects
+    protected abstract void init();
 
-    public void setInitialized(boolean initialized) {
-        this.initialized = initialized;
+    public void setUninitialized() {
+        this.initialized = false;
     }
 
     public ExternalDatabase getDbForReplay(long dbId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 5896c96622..f348e93787 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -22,17 +22,12 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
 import org.apache.doris.cluster.ClusterNamespace;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.util.Util;
-import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.List;
@@ -44,7 +39,8 @@ import java.util.Map;
 public class HMSExternalCatalog extends ExternalCatalog {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalCatalog.class);
 
-    protected IMetaStoreClient client;
+    private static final int MAX_CLIENT_POOL_SIZE = 8;
+    protected PooledHiveMetaStoreClient client;
 
     /**
      * Default constructor for HMSExternalCatalog.
@@ -61,23 +57,15 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return catalogProperty.getOrDefault("hive.metastore.uris", "");
     }
 
-    private void init() {
+    @Override
+    protected void init() {
         Map<String, Long> tmpDbNameToId = Maps.newConcurrentMap();
         Map<Long, ExternalDatabase> tmpIdToDb = Maps.newConcurrentMap();
         InitCatalogLog initCatalogLog = new InitCatalogLog();
         initCatalogLog.setCatalogId(id);
         initCatalogLog.setType(InitCatalogLog.Type.HMS);
-        List<String> allDatabases;
-        try {
-            allDatabases = client.getAllDatabases();
-        } catch (TException e) {
-            LOG.warn("Fail to init db name to id map. {}", e.getMessage());
-            return;
-        }
+        List<String> allDatabases = client.getAllDatabases();
         // Update the db name to id map.
-        if (allDatabases == null) {
-            return;
-        }
         for (String dbName : allDatabases) {
             long dbId;
             if (dbNameToId != null && dbNameToId.containsKey(dbName)) {
@@ -97,41 +85,22 @@ public class HMSExternalCatalog extends ExternalCatalog {
         }
         dbNameToId = tmpDbNameToId;
         idToDb = tmpIdToDb;
-        initialized = true;
         Env.getCurrentEnv().getEditLog().logInitCatalog(initCatalogLog);
     }
 
-    /**
-     * Catalog can't be init when creating because the external catalog may 
depend on third system.
-     * So you have to make sure the client of third system is initialized 
before any method was called.
-     */
     @Override
-    public synchronized void makeSureInitialized() {
-        if (!objectCreated) {
-            try {
-                HiveConf hiveConf = new HiveConf();
-                hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
getHiveMetastoreUris());
-                client = HiveMetaStoreClientHelper.getClient(hiveConf);
-                objectCreated = true;
-            } catch (DdlException e) {
-                Util.logAndThrowRuntimeException(LOG,
-                        String.format("failed to create hive meta store client 
for catalog: %s", name), e);
-            }
-        }
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(id, -1, -1);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init catalog %s 
operation to master.", name), e);
-                }
-                return;
-            }
-            init();
-        }
+    protected void initLocalObjectsImpl() {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
getHiveMetastoreUris());
+
+        // 1. read properties from hive-site.xml.
+        // and then use properties in CatalogProperty to override properties 
got from hive-site.xml
+        Map<String, String> properties = 
HiveMetaStoreClientHelper.getPropertiesForDLF(name, hiveConf);
+        properties.putAll(catalogProperty.getProperties());
+        catalogProperty.setProperties(properties);
+
+        // 2. init hms client
+        client = new PooledHiveMetaStoreClient(hiveConf, MAX_CLIENT_POOL_SIZE);
     }
 
     @Override
@@ -149,25 +118,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
             hmsExternalDatabase.getTables().stream().forEach(table -> 
names.add(table.getName()));
             return names;
         } else {
-            try {
-                return client.getAllTables(getRealTableName(dbName));
-            } catch (TException e) {
-                Util.logAndThrowRuntimeException(LOG, String.format("list 
table names failed for %s.%s", name, dbName),
-                        e);
-            }
+            return client.getAllTables(getRealTableName(dbName));
         }
-        return Lists.newArrayList();
     }
 
     @Override
     public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
-        try {
-            return client.tableExists(getRealTableName(dbName), tblName);
-        } catch (TException e) {
-            Util.logAndThrowRuntimeException(LOG,
-                    String.format("check table exist failed for %s.%s.%s", 
name, dbName, tblName), e);
-        }
-        return false;
+        return client.tableExists(getRealTableName(dbName), tblName);
     }
 
     @Nullable
@@ -198,7 +155,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return idToDb.get(dbId);
     }
 
-    public IMetaStoreClient getClient() {
+    public PooledHiveMetaStoreClient getClient() {
         makeSureInitialized();
         return client;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index a729f9e71f..c66ee75e46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -181,7 +181,7 @@ import com.google.common.collect.Sets;
 import lombok.Getter;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
@@ -1215,6 +1215,16 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 } else {
                     typeDef = new TypeDef(resultExpr.getType());
                 }
+                if (i == 0) {
+                    // If this is the first column, because olap table does 
not support the first column to be
+                    // string, float, double or array, we should check and 
modify its type
+                    // For string type, change it to varchar.
+                    // For other unsupport types, just remain unchanged, the 
analysis phash of create table stmt
+                    // will handle it.
+                    if (typeDef.getType() == Type.STRING) {
+                        typeDef = 
TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH);
+                    }
+                }
                 ColumnDef columnDef;
                 if (resultExpr.getSrcSlotRef() == null) {
                     columnDef = new ColumnDef(name, typeDef, false, null, 
true, new DefaultValue(false, null), "");
@@ -2268,10 +2278,11 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         HiveTable hiveTable = new HiveTable(tableId, tableName, columns, 
stmt.getProperties());
         hiveTable.setComment(stmt.getComment());
         // check hive table whether exists in hive database
-        IMetaStoreClient hiveMetaStoreClient = 
HiveMetaStoreClientHelper.getClient(
-                
hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
-        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, 
hiveTable.getHiveDb(),
-                hiveTable.getHiveTable())) {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS,
+                
hiveTable.getHiveProperties().get(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS));
+        PooledHiveMetaStoreClient client = new 
PooledHiveMetaStoreClient(hiveConf, 1);
+        if (!client.tableExists(hiveTable.getHiveDb(), 
hiveTable.getHiveTable())) {
             throw new DdlException(String.format("Table [%s] dose not exist in 
Hive.", hiveTable.getHiveDbTable()));
         }
         // check hive table if exists in doris database
@@ -2290,15 +2301,17 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         // check hudi properties in create stmt.
         HudiUtils.validateCreateTable(hudiTable);
         // check hudi table whether exists in hive database
-        String metastoreUris = 
hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
-        IMetaStoreClient hiveMetaStoreClient = 
HiveMetaStoreClientHelper.getClient(metastoreUris);
-        if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient, 
hudiTable.getHmsDatabaseName(),
-                hudiTable.getHmsTableName())) {
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS,
+                
hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS));
+        PooledHiveMetaStoreClient client = new 
PooledHiveMetaStoreClient(hiveConf, 1);
+        if (!client.tableExists(hudiTable.getHmsDatabaseName(), 
hudiTable.getHmsTableName())) {
             throw new DdlException(
                     String.format("Table [%s] dose not exist in Hive 
Metastore.", hudiTable.getHmsTableIdentifer()));
         }
-        org.apache.hadoop.hive.metastore.api.Table hiveTable = 
HiveMetaStoreClientHelper.getTable(
-                hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName(), 
metastoreUris);
+
+        org.apache.hadoop.hive.metastore.api.Table hiveTable = client.getTable(
+                hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName());
         if (!HudiUtils.isHudiTable(hiveTable)) {
             throw new DdlException(String.format("Table [%s] is not a hudi 
table.", hudiTable.getHmsTableIdentifer()));
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
new file mode 100644
index 0000000000..c2f3567f56
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.common.Config;
+
+import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * A hive metastore client pool for a specific hive conf.
+ */
+public class PooledHiveMetaStoreClient {
+    private static final Logger LOG = 
LogManager.getLogger(PooledHiveMetaStoreClient.class);
+
+    private Queue<CachedClient> clientPool = new LinkedList<>();
+    private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
+    private final int poolSize;
+    private final HiveConf hiveConf;
+
+    public PooledHiveMetaStoreClient(HiveConf hiveConf, int pooSize) {
+        Preconditions.checkArgument(pooSize > 0, pooSize);
+        hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
+                String.valueOf(Config.hive_metastore_client_timeout_second));
+        this.hiveConf = hiveConf;
+        this.poolSize = pooSize;
+    }
+
+    public List<String> getAllDatabases() {
+        try (CachedClient client = getClient()) {
+            return client.client.getAllDatabases();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<String> getAllTables(String dbName) {
+        try (CachedClient client = getClient()) {
+            return client.client.getAllTables(dbName);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean tableExists(String dbName, String tblName) {
+        try (CachedClient client = getClient()) {
+            return client.client.tableExists(dbName, tblName);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean listPartitionsByExpr(String dbName, String tblName,
+            byte[] partitionPredicatesInBytes, List<Partition> hivePartitions) 
{
+        try (CachedClient client = getClient()) {
+            return client.client.listPartitionsByExpr(dbName, tblName, 
partitionPredicatesInBytes,
+                    null, (short) -1, hivePartitions);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Table getTable(String dbName, String tblName) {
+        try (CachedClient client = getClient()) {
+            return client.client.getTable(dbName, tblName);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public List<FieldSchema> getSchema(String dbName, String tblName) {
+        try (CachedClient client = getClient()) {
+            return client.client.getSchema(dbName, tblName);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private class CachedClient implements AutoCloseable {
+        private final IMetaStoreClient client;
+
+        private CachedClient(HiveConf hiveConf) throws MetaException {
+            String type = 
hiveConf.get(HiveMetaStoreClientHelper.HIVE_METASTORE_TYPE);
+            if (HiveMetaStoreClientHelper.DLF_TYPE.equalsIgnoreCase(type)) {
+                client = RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
+                        ProxyMetaStoreClient.class.getName());
+            } else {
+                client = RetryingMetaStoreClient.getProxy(hiveConf, 
DUMMY_HOOK_LOADER,
+                        HiveMetaStoreClient.class.getName());
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            synchronized (clientPool) {
+                if (clientPool.size() > poolSize) {
+                    client.close();
+                } else {
+                    clientPool.offer(this);
+                }
+            }
+        }
+    }
+
+    private CachedClient getClient() throws MetaException {
+        synchronized (clientPool) {
+            CachedClient client = clientPool.poll();
+            if (client == null) {
+                return new CachedClient(hiveConf);
+            }
+            return client;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 85425f4cb2..9fd59fc43d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.external.hive.util.HiveUtil;
@@ -65,7 +66,7 @@ public class HiveScanProvider extends HMSTableScanProvider {
     private static final Logger LOG = 
LogManager.getLogger(HiveScanProvider.class);
 
     private static final String PROP_FIELD_DELIMITER = "field.delim";
-    private static final String DEFAULT_FIELD_DELIMITER = "\001";
+    private static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
     private static final String DEFAULT_LINE_DELIMITER = "\n";
 
     protected HMSExternalTable hmsTable;
@@ -101,13 +102,21 @@ public class HiveScanProvider extends 
HMSTableScanProvider {
     public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
         String location = hmsTable.getRemoteTable().getSd().getLocation();
         if (location != null && !location.isEmpty()) {
-            if (location.startsWith("s3a") || location.startsWith("s3n")) {
+            if (location.startsWith(FeConstants.FS_PREFIX_S3)
+                    || location.startsWith(FeConstants.FS_PREFIX_S3A)
+                    || location.startsWith(FeConstants.FS_PREFIX_S3N)
+                    || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                    || location.startsWith(FeConstants.FS_PREFIX_COS)
+                    || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                    || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
                 return TFileType.FILE_S3;
-            } else if (location.startsWith("hdfs:")) {
+            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
                 return TFileType.FILE_HDFS;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+                return TFileType.FILE_LOCAL;
             }
         }
-        throw new DdlException("Unknown file location for hms table.");
+        throw new DdlException("Unknown file location " + location + " for hms 
table " + hmsTable.getName());
     }
 
     @Override
@@ -117,7 +126,10 @@ public class HiveScanProvider extends HMSTableScanProvider 
{
 
     @Override
     public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException {
-        String splitsPath = getRemoteHiveTable().getSd().getLocation();
+        // eg:
+        // oss://buckts/data_dir
+        // hdfs://hosts/data_dir
+        String location = getRemoteHiveTable().getSd().getLocation();
         List<String> partitionKeys = 
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName)
                 .collect(Collectors.toList());
         List<Partition> hivePartitions = new ArrayList<>();
@@ -146,7 +158,7 @@ public class HiveScanProvider extends HMSTableScanProvider {
                 throw new IOException(e);
             }
         } else {
-            splits = getSplitsByPath(inputFormat, configuration, splitsPath);
+            splits = getSplitsByPath(inputFormat, configuration, location);
         }
         return HiveBucketUtil.getPrunedSplitsByBuckets(splits, 
hmsTable.getName(), exprs,
                 getRemoteHiveTable().getSd().getBucketCols(), 
getRemoteHiveTable().getSd().getNumBuckets(),
@@ -154,7 +166,8 @@ public class HiveScanProvider extends HMSTableScanProvider {
     }
 
     private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat, 
Configuration configuration,
-            String splitsPath) throws IOException {
+            String location) throws IOException {
+        String finalLocation = convertToS3IfNecessary(location);
         JobConf jobConf = new JobConf(configuration);
         // For Tez engine, it may generate subdirectoies for "union" query.
         // So there may be files and directories in the table directory at the 
same time. eg:
@@ -165,16 +178,32 @@ public class HiveScanProvider extends 
HMSTableScanProvider {
         // Otherwise, getSplits() may throw exception: "Not a file xxx"
         // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
         jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
-        FileInputFormat.setInputPaths(jobConf, splitsPath);
+        FileInputFormat.setInputPaths(jobConf, finalLocation);
         InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
         return Lists.newArrayList(splits);
     }
 
+    // convert oss:// to s3://
+    private String convertToS3IfNecessary(String location) throws IOException {
+        LOG.debug("try convert location to s3 prefix: " + location);
+        if (location.startsWith(FeConstants.FS_PREFIX_COS)
+                || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                || location.startsWith(FeConstants.FS_PREFIX_S3A)
+                || location.startsWith(FeConstants.FS_PREFIX_S3N)) {
+            int pos = location.indexOf("://");
+            if (pos == -1) {
+                throw new IOException("No '://' found in location: " + 
location);
+            }
+            return "s3" + location.substring(pos);
+        }
+        return location;
+    }
 
     protected Configuration setConfiguration() {
         Configuration conf = new HdfsConfiguration();
-        Map<String, String> dfsProperties = hmsTable.getDfsProperties();
-        for (Map.Entry<String, String> entry : dfsProperties.entrySet()) {
+        for (Map.Entry<String, String> entry : 
hmsTable.getCatalog().getCatalogProperty().getProperties().entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
         Map<String, String> s3Properties = hmsTable.getS3Properties();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 70154ce59c..4b865560d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -70,6 +70,9 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
 
             String fullPath = ((FileSplit) 
inputSplits.get(0)).getPath().toUri().toString();
             String filePath = ((FileSplit) 
inputSplits.get(0)).getPath().toUri().getPath();
+            // eg:
+            // hdfs://namenode
+            // s3://buckets
             String fsName = fullPath.replace(filePath, "");
             TFileType locationType = getLocationType();
             context.params.setFileType(locationType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to