This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eed5a292c [feature-wip](multi-catalog) Support hive partition cache 
(#14134)
7eed5a292c is described below

commit 7eed5a292c71a5ff67dce8333e58e4aa49ddb276
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Mon Nov 14 14:12:40 2022 +0800

    [feature-wip](multi-catalog) Support hive partition cache (#14134)
---
 be/src/exec/text_converter.hpp                     |   2 +-
 docs/en/docs/admin-manual/config/fe-config.md      |  52 +++
 docs/zh-CN/docs/admin-manual/config/fe-config.md   |  43 ++-
 .../maint-monitor/monitor-metrics/metrics.md       |   5 +
 .../java/org/apache/doris/analysis/ColumnDef.java  |   4 +-
 .../main/java/org/apache/doris/catalog/Env.java    |   8 +
 .../org/apache/doris/catalog/PartitionKey.java     |  21 +-
 .../org/apache/doris/catalog/RefreshManager.java   |   3 +-
 .../doris/catalog/external/EsExternalDatabase.java |  24 +-
 .../doris/catalog/external/EsExternalTable.java    |  82 +---
 .../doris/catalog/external/ExternalDatabase.java   |  27 +-
 .../doris/catalog/external/ExternalTable.java      |  43 +--
 .../catalog/external/HMSExternalDatabase.java      |  24 +-
 .../doris/catalog/external/HMSExternalTable.java   | 136 ++-----
 .../main/java/org/apache/doris/common/Config.java  |  28 ++
 .../apache/doris/datasource/CacheException.java    |  24 ++
 .../org/apache/doris/datasource/CatalogMgr.java    |  25 +-
 .../apache/doris/datasource/EsExternalCatalog.java |   7 +
 .../apache/doris/datasource/ExternalCatalog.java   |   7 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |  95 +++++
 .../doris/datasource/ExternalSchemaCache.java      | 131 +++++++
 .../doris/datasource/HMSClientException.java       |  24 ++
 .../doris/datasource/HMSExternalCatalog.java       |  15 +
 .../datasource/PooledHiveMetaStoreClient.java      |  42 +-
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 423 +++++++++++++++++++++
 .../doris/datasource/hive/HivePartition.java       |  46 +++
 .../java/org/apache/doris/persist/EditLog.java     |   8 +-
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../java/org/apache/doris/planner/ScanNode.java    |   1 -
 .../planner/external/ExternalFileScanNode.java     |  12 +-
 .../doris/planner/external/HiveScanProvider.java   | 151 ++++----
 .../doris/planner/external/HudiScanProvider.java   |   7 +-
 .../planner/external/IcebergScanProvider.java      |   9 +-
 .../doris/planner/external/QueryScanProvider.java  |  14 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +-
 35 files changed, 1155 insertions(+), 391 deletions(-)

diff --git a/be/src/exec/text_converter.hpp b/be/src/exec/text_converter.hpp
index 4eaa065947..45a7a4e570 100644
--- a/be/src/exec/text_converter.hpp
+++ b/be/src/exec/text_converter.hpp
@@ -206,7 +206,6 @@ inline bool TextConverter::write_vec_column(const 
SlotDescriptor* slot_desc,
     }
 
     StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
-
     // Parse the raw-text data. Translate the text string to internal format.
     switch (slot_desc->type().type) {
     case TYPE_HLL: {
@@ -314,6 +313,7 @@ inline bool TextConverter::write_vec_column(const 
SlotDescriptor* slot_desc,
             size_t size = nullable_column->get_null_map_data().size();
             doris::vectorized::NullMap& null_map_data = 
nullable_column->get_null_map_data();
             null_map_data[size - 1] = 1;
+            nullable_column->get_nested_column().insert_default();
         }
         return false;
     }
diff --git a/docs/en/docs/admin-manual/config/fe-config.md 
b/docs/en/docs/admin-manual/config/fe-config.md
index 66ae4c75d7..71fc393f72 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2258,3 +2258,55 @@ Default: 1
 Is it possible to configure dynamically: true
 
 Whether it is a configuration item unique to the Master FE node: true
+
+### `max_replica_count_when_schema_change`
+
+The maximum number of replicas allowed when OlapTable is doing schema changes. 
Too many replicas will lead to FE OOM.
+
+Default: 100000
+
+Is it possible to configure dynamically: true
+
+Whether it is a configuration item unique to the Master FE node: true
+
+### `max_hive_partition_cache_num`
+
+The maximum number of caches for the hive partition.
+
+Default: 100000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `max_external_file_cache_num`
+
+Maximum number of file cache to use for external external tables.
+
+Default: 100000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `max_external_schema_cache_num`
+
+Maximum number of schema cache to use for external external tables.
+
+Default: 10000
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
+### `external_cache_expire_time_minutes_after_access`
+
+Set how long the data in the cache expires after the last access. The unit is 
minutes.
+Applies to External Schema Cache as well as Hive Partition Cache.
+
+Default: 1440
+
+Is it possible to dynamically configure: false
+
+Is it a configuration item unique to the Master FE node: false
+
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md 
b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 957f87bcd6..dd2f238032 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2315,7 +2315,7 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
 是否为 Master FE 节点独有的配置项:true
 
 
-### max_replica_count_when_schema_change
+### `max_replica_count_when_schema_change`
 
 OlapTable在做schema change时,允许的最大副本数,副本数过大会导致FE OOM。
 
@@ -2324,3 +2324,44 @@ OlapTable在做schema change时,允许的最大副本数,副本数过大会
 是否可以动态配置:true
 
 是否为 Master FE 节点独有的配置项:true
+
+### `max_hive_partition_cache_num`
+
+hive partition 的最大缓存数量。
+
+默认值:100000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `max_external_file_cache_num`
+
+用于 external 外部表的最大文件缓存数量。
+
+默认值:100000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `max_external_schema_cache_num`
+
+用于 external 外部表的最大 schema 缓存数量。
+
+默认值:10000
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
+
+### `external_cache_expire_time_minutes_after_access`
+
+设置缓存中的数据,在最后一次访问后多久失效。单位为分钟。
+适用于 External Schema Cache 以及 Hive Partition Cache.
+
+默认值:1440
+
+是否可以动态配置:false
+
+是否为 Master FE 节点独有的配置项:false
diff --git 
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md 
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 206f021e2e..96ede4e7d5 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -143,6 +143,11 @@ curl http://be_host:webserver_port/metrics?type=json
 |`doris_fe_txn_replica_num`|| Num| 指定DB正在执行的事务打开的副本数。如 {db="test"} 表示DB test 
当前正在执行的事务打开的副本数 |该数值可以观测某个DB是否打开了过多的副本,可能会影响其他事务执行| P0 |
 |`doris_fe_thrift_rpc_total`|| Num| FE thrift接口各个方法接收的RPC请求次数。如 
{method="report"} 表示 report 方法接收的RPC请求次数 |该数值可以观测某个thrift rpc方法的负载| |
 |`doris_fe_thrift_rpc_latency_ms`|| 毫秒| FE thrift接口各个方法接收的RPC请求耗时。如 
{method="report"} 表示 report 方法接收的RPC请求耗时 |该数值可以观测某个thrift rpc方法的负载| |
+|`doris_fe_external_schema_cache` | {catalog="hive"} | Num | 指定 External 
Catalog 对应的 schema cache 的数量 |||
+|`doris_fe_hive_meta_cache` | {catalog="hive"} | Num | |||
+| | `{type="partition_value"}` | Num | 指定 External Hive Metastore Catalog 对应的 
partition value cache 的数量 |||
+| | `{type="partition"}` | Num | 指定 External Hive Metastore Catalog 对应的 
partition cache 的数量 |||
+| | `{type="file"}` | Num | 指定 External Hive Metastore Catalog 对应的 file cache 
的数量 |||
 
 ### JVM 监控
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 87267e69d2..36b556c623 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -199,14 +199,14 @@ public class ColumnDef {
         }
         FeNameFormat.checkColumnName(name);
 
-        // When string type length is not assigned, it need to be assigned to 
1.
+        // When string type length is not assigned, it needs to be assigned to 
1.
         if (typeDef.getType().isScalarType()) {
             final ScalarType targetType = (ScalarType) typeDef.getType();
             if (targetType.getPrimitiveType().isStringType() && 
!targetType.isLengthSet()) {
                 if (targetType.getPrimitiveType() != PrimitiveType.STRING) {
                     targetType.setLength(1);
                 } else {
-                    // alway set text length MAX_STRING_LENGTH
+                    // always set text length MAX_STRING_LENGTH
                     targetType.setLength(ScalarType.MAX_STRING_LENGTH);
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 022f86cbf0..8ce6d7a346 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -124,6 +124,7 @@ import org.apache.doris.consistency.ConsistencyChecker;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.EsExternalCatalog;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.deploy.DeployManager;
 import org.apache.doris.deploy.impl.AmbariDeployManager;
@@ -450,6 +451,8 @@ public class Env {
 
     private final StatisticsCache statisticsCache;
 
+    private ExternalMetaCacheMgr extMetaCacheMgr;
+
     public List<Frontend> getFrontends(FrontendNodeType nodeType) {
         if (nodeType == null) {
             // get all
@@ -515,6 +518,10 @@ public class Env {
         return mtmvJobManager;
     }
 
+    public ExternalMetaCacheMgr getExtMetaCacheMgr() {
+        return extMetaCacheMgr;
+    }
+
     public CatalogIf getCurrentCatalog() {
         ConnectContext ctx = ConnectContext.get();
         if (ctx == null) {
@@ -646,6 +653,7 @@ public class Env {
         this.mtmvJobManager = new MTMVJobManager();
         this.analysisJobScheduler = new AnalysisJobScheduler();
         this.statisticsCache = new StatisticsCache();
+        this.extMetaCacheMgr = new ExternalMetaCacheMgr();
     }
 
     public static void destroyCheckpoint() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index d8bcb961e2..f773209cd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -40,6 +40,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 
 public class PartitionKey implements Comparable<PartitionKey>, Writable {
@@ -86,7 +87,7 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         return partitionKey;
     }
 
-    public static PartitionKey createListPartitionKey(List<PartitionValue> 
values, List<Column> columns)
+    public static PartitionKey 
createListPartitionKeyWithTypes(List<PartitionValue> values, List<Type> types)
             throws AnalysisException {
         // for multi list partition:
         //
@@ -108,18 +109,24 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         //     PARTITION p6 VALUES IN ("26")
         // )
         //
-        Preconditions.checkArgument(values.size() == columns.size(),
-                "in value size[" + values.size() + "] is not equal to 
partition column size[" + columns.size() + "].");
+        Preconditions.checkArgument(values.size() == types.size(),
+                "in value size[" + values.size() + "] is not equal to 
partition column size[" + types.size() + "].");
 
         PartitionKey partitionKey = new PartitionKey();
         for (int i = 0; i < values.size(); i++) {
-            
partitionKey.keys.add(values.get(i).getValue(Type.fromPrimitiveType(columns.get(i).getDataType())));
-            partitionKey.types.add(columns.get(i).getDataType());
+            partitionKey.keys.add(values.get(i).getValue(types.get(i)));
+            partitionKey.types.add(types.get(i).getPrimitiveType());
         }
 
         return partitionKey;
     }
 
+    public static PartitionKey createListPartitionKey(List<PartitionValue> 
values, List<Column> columns)
+            throws AnalysisException {
+        List<Type> types = columns.stream().map(c -> 
c.getType()).collect(Collectors.toList());
+        return createListPartitionKeyWithTypes(values, types);
+    }
+
     public void pushColumn(LiteralExpr keyValue, PrimitiveType keyType) {
         keys.add(keyValue);
         types.add(keyType);
@@ -163,6 +170,10 @@ public class PartitionKey implements 
Comparable<PartitionKey>, Writable {
         return true;
     }
 
+    public List<String> getPartitionValuesAsStringList() {
+        return keys.stream().map(k -> 
k.getStringValue()).collect(Collectors.toList());
+    }
+
     public static int compareLiteralExpr(LiteralExpr key1, LiteralExpr key2) {
         int ret = 0;
         if (key1 instanceof MaxLiteral || key2 instanceof MaxLiteral) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index bb37a9b1b9..439c8c73c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -23,7 +23,6 @@ import org.apache.doris.analysis.RefreshDbStmt;
 import org.apache.doris.analysis.RefreshTableStmt;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
@@ -160,7 +159,7 @@ public class RefreshManager {
         if (table == null) {
             throw new DdlException("Table " + tableName + " does not exist in 
db " + dbName);
         }
-        ((ExternalTable) table).setUnInitialized();
+        Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId(), 
dbName, tableName);
         ExternalObjectLog log = new ExternalObjectLog();
         log.setCatalogId(catalog.getId());
         log.setDbId(db.getId());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
index 784c0b8bdc..e9b3ce354b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalDatabase.java
@@ -18,12 +18,10 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.EsExternalCatalog;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitDatabaseLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
@@ -65,7 +63,6 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> implem
         Map<Long, EsExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
         for (int i = 0; i < log.getRefreshCount(); i++) {
             EsExternalTable table = 
getTableForReplay(log.getRefreshTableIds().get(i));
-            table.setUnInitialized();
             tmpTableNameToId.put(table.getName(), table.getId());
             tmpIdToTbl.put(table.getId(), table);
         }
@@ -86,24 +83,8 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> implem
         }
     }
 
-    public synchronized void makeSureInitialized() {
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(extCatalog.getId(), id, -1);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init external db 
%s operation to master", name), e);
-                }
-                return;
-            }
-            init();
-        }
-    }
-
-    private void init() {
+    @Override
+    protected void init() {
         InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
         initDatabaseLog.setType(InitDatabaseLog.Type.ES);
         initDatabaseLog.setCatalogId(extCatalog.getId());
@@ -118,7 +99,6 @@ public class EsExternalDatabase extends 
ExternalDatabase<EsExternalTable> implem
                     tblId = tableNameToId.get(tableName);
                     tmpTableNameToId.put(tableName, tblId);
                     EsExternalTable table = idToTbl.get(tblId);
-                    table.setUnInitialized();
                     tmpIdToTbl.put(tblId, table);
                     initDatabaseLog.addRefreshTable(tblId);
                 } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
index 5cbafdcd15..eb8c8972b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/EsExternalTable.java
@@ -18,13 +18,8 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.EsTable;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.EsExternalCatalog;
-import org.apache.doris.datasource.InitTableLog;
-import org.apache.doris.external.elasticsearch.EsUtil;
-import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.thrift.TEsTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -54,84 +49,13 @@ public class EsExternalTable extends ExternalTable {
         super(id, name, catalog, dbName, TableType.ES_EXTERNAL_TABLE);
     }
 
-
     public synchronized void makeSureInitialized() {
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                fullSchema = null;
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(catalog.getId(), 
catalog.getDbNullable(dbName).getId(), id);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init external 
table %s operation to master", name), e);
-                }
-            } else {
-                init();
-            }
-        }
         if (!objectCreated) {
             esTable = toEsTable();
             objectCreated = true;
         }
     }
 
-    private void init() {
-        boolean schemaChanged = false;
-        List<Column> tmpSchema = EsUtil.genColumnsFromEs(
-                ((EsExternalCatalog) catalog).getEsRestClient(), name, null);
-        if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
-            schemaChanged = true;
-        } else {
-            for (int i = 0; i < fullSchema.size(); i++) {
-                if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
-                    schemaChanged = true;
-                    break;
-                }
-            }
-        }
-        if (schemaChanged) {
-            timestamp = System.currentTimeMillis();
-            fullSchema = tmpSchema;
-            esTable = toEsTable();
-        }
-        initialized = true;
-        InitTableLog initTableLog = new InitTableLog();
-        initTableLog.setCatalogId(catalog.getId());
-        initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
-        initTableLog.setTableId(id);
-        initTableLog.setSchema(fullSchema);
-        Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
-    }
-
-    @Override
-    public List<Column> getFullSchema() {
-        makeSureInitialized();
-        return fullSchema;
-    }
-
-    @Override
-    public List<Column> getBaseSchema() {
-        return getFullSchema();
-    }
-
-    @Override
-    public List<Column> getBaseSchema(boolean full) {
-        return getFullSchema();
-    }
-
-    @Override
-    public Column getColumn(String name) {
-        makeSureInitialized();
-        for (Column column : fullSchema) {
-            if (name.equals(column.getName())) {
-                return column;
-            }
-        }
-        return null;
-    }
-
     public EsTable getEsTable() {
         makeSureInitialized();
         return esTable;
@@ -151,16 +75,18 @@ public class EsExternalTable extends ExternalTable {
 
     @Override
     public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
         TEsTable tEsTable = new TEsTable();
-        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ES_TABLE, fullSchema.size(), 0,
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.ES_TABLE, schema.size(), 0,
                 getName(), "");
         tTableDescriptor.setEsTable(tEsTable);
         return tTableDescriptor;
     }
 
     private EsTable toEsTable() {
+        List<Column> schema = getFullSchema();
         EsExternalCatalog esCatalog = (EsExternalCatalog) catalog;
-        EsTable esTable = new EsTable(this.id, this.name, this.fullSchema, 
TableType.ES_EXTERNAL_TABLE);
+        EsTable esTable = new EsTable(this.id, this.name, schema, 
TableType.ES_EXTERNAL_TABLE);
         esTable.setIndexName(name);
         esTable.setClient(esCatalog.getEsRestClient());
         esTable.setUserName(esCatalog.getUsername());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index c5fa7b9875..87d1cccf5d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -19,15 +19,18 @@ package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.DatabaseProperty;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitDatabaseLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.gson.annotations.SerializedName;
 import org.apache.commons.lang.NotImplementedException;
@@ -87,7 +90,8 @@ public class ExternalDatabase<T extends ExternalTable> 
implements DatabaseIf<T>,
         this.extCatalog = extCatalog;
     }
 
-    public void setTableExtCatalog(ExternalCatalog extCatalog) {}
+    public void setTableExtCatalog(ExternalCatalog extCatalog) {
+    }
 
     public void setUnInitialized() {
         this.initialized = false;
@@ -97,7 +101,26 @@ public class ExternalDatabase<T extends ExternalTable> 
implements DatabaseIf<T>,
         return initialized;
     }
 
-    public void makeSureInitialized() {}
+    public final synchronized void makeSureInitialized() {
+        if (!initialized) {
+            if (!Env.getCurrentEnv().isMaster()) {
+                // Forward to master and wait the journal to replay.
+                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
+                try {
+                    remoteExecutor.forward(extCatalog.getId(), id, -1);
+                } catch (Exception e) {
+                    Util.logAndThrowRuntimeException(LOG,
+                            String.format("failed to forward init external db 
%s operation to master", name), e);
+                }
+                return;
+            }
+            init();
+        }
+    }
+
+    protected void init() {
+        throw new NotImplementedException();
+    }
 
     public T getTableForReplay(long tableId) {
         throw new NotImplementedException();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
index f713de7419..36189c9a00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java
@@ -19,12 +19,14 @@ package org.apache.doris.catalog.external;
 
 import org.apache.doris.alter.AlterCancelException;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TTableDescriptor;
@@ -56,26 +58,20 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
     protected String name;
     @SerializedName(value = "type")
     protected TableType type = null;
-    @SerializedName(value = "fullSchema")
-    protected volatile List<Column> fullSchema = null;
-    @SerializedName(value = "initialized")
-    protected boolean initialized = false;
     @SerializedName(value = "timestamp")
     protected long timestamp;
-
-    protected ExternalCatalog catalog;
     @SerializedName(value = "dbName")
     protected String dbName;
+
     protected boolean objectCreated = false;
+    protected ExternalCatalog catalog;
     protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
     /**
      * No args constructor for persist.
      */
     public ExternalTable() {
-        this.initialized = false;
         this.objectCreated = false;
-        this.fullSchema = null;
     }
 
     /**
@@ -93,9 +89,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
         this.catalog = catalog;
         this.dbName = dbName;
         this.type = type;
-        this.initialized = false;
         this.objectCreated = false;
-        this.fullSchema = null;
     }
 
     public void setCatalog(ExternalCatalog catalog) {
@@ -106,17 +100,10 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
         return false;
     }
 
-    public void setUnInitialized() {
-        this.initialized = false;
-    }
-
-    public void replayInitTable(List<Column> schema) {
-        fullSchema = schema;
-        initialized = true;
+    public void makeSureInitialized() {
+        throw new NotImplementedException();
     }
 
-    public void makeSureInitialized() {}
-
     @Override
     public void readLock() {
         this.rwLock.readLock().lock();
@@ -226,27 +213,35 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
 
     @Override
     public List<Column> getFullSchema() {
-        throw new NotImplementedException();
+        makeSureInitialized();
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        return cache.getSchema(dbName, name);
     }
 
     @Override
     public List<Column> getBaseSchema() {
-        throw new NotImplementedException();
+        return getFullSchema();
     }
 
     @Override
     public List<Column> getBaseSchema(boolean full) {
-        throw new NotImplementedException();
+        return getFullSchema();
     }
 
+
     @Override
     public void setNewFullSchema(List<Column> newSchema) {
-        this.fullSchema = newSchema;
     }
 
     @Override
     public Column getColumn(String name) {
-        throw new NotImplementedException();
+        List<Column> schema = getFullSchema();
+        for (Column column : schema) {
+            if (name.equals(column.getName())) {
+                return column;
+            }
+        }
+        return null;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index ee7c589019..decef86caa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -19,12 +19,10 @@ package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InitDatabaseLog;
 import org.apache.doris.persist.gson.GsonPostProcessable;
-import org.apache.doris.qe.MasterCatalogExecutor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -68,7 +66,6 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> impl
         Map<Long, HMSExternalTable> tmpIdToTbl = Maps.newConcurrentMap();
         for (int i = 0; i < log.getRefreshCount(); i++) {
             HMSExternalTable table = 
getTableForReplay(log.getRefreshTableIds().get(i));
-            table.setUnInitialized();
             tmpTableNameToId.put(table.getName(), table.getId());
             tmpIdToTbl.put(table.getId(), table);
         }
@@ -89,24 +86,8 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> impl
         }
     }
 
-    public synchronized void makeSureInitialized() {
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(extCatalog.getId(), id, -1);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init external db 
%s operation to master", name), e);
-                }
-                return;
-            }
-            init();
-        }
-    }
-
-    private void init() {
+    @Override
+    protected void init() {
         InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
         initDatabaseLog.setType(InitDatabaseLog.Type.HMS);
         initDatabaseLog.setCatalogId(extCatalog.getId());
@@ -121,7 +102,6 @@ public class HMSExternalDatabase extends 
ExternalDatabase<HMSExternalTable> impl
                     tblId = tableNameToId.get(tableName);
                     tmpTableNameToId.put(tableName, tblId);
                     HMSExternalTable table = idToTbl.get(tblId);
-                    table.setUnInitialized();
                     tmpIdToTbl.put(tblId, table);
                     initDatabaseLog.addRefreshTable(tblId);
                 } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 3d8b2e5778..6e12f7e30a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,15 +18,9 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.common.DdlException;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.HMSExternalCatalog;
-import org.apache.doris.datasource.InitTableLog;
-import org.apache.doris.datasource.PooledHiveMetaStoreClient;
-import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -34,9 +28,6 @@ import org.apache.doris.thrift.TTableType;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -44,6 +35,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Hive metastore external table.
@@ -61,6 +53,8 @@ public class HMSExternalTable extends ExternalTable {
     }
 
     private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = 
null;
+    private List<Column> partitionColumns;
+
     private DLAType dlaType = DLAType.UNKNOWN;
 
     public enum DLAType {
@@ -106,21 +100,6 @@ public class HMSExternalTable extends ExternalTable {
             }
             objectCreated = true;
         }
-        if (!initialized) {
-            if (!Env.getCurrentEnv().isMaster()) {
-                fullSchema = null;
-                // Forward to master and wait the journal to replay.
-                MasterCatalogExecutor remoteExecutor = new 
MasterCatalogExecutor();
-                try {
-                    remoteExecutor.forward(catalog.getId(), 
catalog.getDbNullable(dbName).getId(), id);
-                } catch (Exception e) {
-                    Util.logAndThrowRuntimeException(LOG,
-                            String.format("failed to forward init external 
table %s operation to master", name), e);
-                }
-                return;
-            }
-            init();
-        }
     }
 
     /**
@@ -160,50 +139,10 @@ public class HMSExternalTable extends ExternalTable {
      * Now we only support three file input format hive tables: 
parquet/orc/text. And they must be managed_table.
      */
     private boolean supportedHiveTable() {
-        // boolean isManagedTable = 
remoteTable.getTableType().equalsIgnoreCase("MANAGED_TABLE");
-        // TODO: try to support EXTERNAL_TABLE
-        boolean isManagedTable = true;
         String inputFileFormat = remoteTable.getSd().getInputFormat();
         boolean supportedFileFormat = inputFileFormat != null && 
SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
         LOG.debug("hms table {} is {} with file format: {}", name, 
remoteTable.getTableType(), inputFileFormat);
-        return isManagedTable && supportedFileFormat;
-    }
-
-    private void init() {
-        boolean schemaChanged = false;
-        List<Column> tmpSchema = Lists.newArrayList();
-        if (dlaType.equals(DLAType.UNKNOWN)) {
-            schemaChanged = true;
-        } else {
-            List<FieldSchema> schema = ((HMSExternalCatalog) 
catalog).getClient().getSchema(dbName, name);
-            for (FieldSchema field : schema) {
-                int columnId = (int) Env.getCurrentEnv().getNextId();
-                tmpSchema.add(new Column(field.getName(),
-                        
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
-                        true, null, field.getComment(), true, null, columnId));
-            }
-            if (fullSchema == null || fullSchema.size() != tmpSchema.size()) {
-                schemaChanged = true;
-            } else {
-                for (int i = 0; i < fullSchema.size(); i++) {
-                    if (!fullSchema.get(i).equals(tmpSchema.get(i))) {
-                        schemaChanged = true;
-                        break;
-                    }
-                }
-            }
-        }
-        if (schemaChanged) {
-            timestamp = System.currentTimeMillis();
-            fullSchema = tmpSchema;
-        }
-        initialized = true;
-        InitTableLog initTableLog = new InitTableLog();
-        initTableLog.setCatalogId(catalog.getId());
-        initTableLog.setDbId(catalog.getDbNameToId().get(dbName));
-        initTableLog.setTableId(id);
-        initTableLog.setSchema(fullSchema);
-        Env.getCurrentEnv().getEditLog().logInitExternalTable(initTableLog);
+        return supportedFileFormat;
     }
 
     /**
@@ -220,36 +159,25 @@ public class HMSExternalTable extends ExternalTable {
         return remoteTable;
     }
 
-    @Override
-    public boolean isView() {
-        return remoteTable.isSetViewOriginalText() || 
remoteTable.isSetViewExpandedText();
-    }
-
-    @Override
-    public List<Column> getFullSchema() {
+    public List<Type> getPartitionColumnTypes() {
         makeSureInitialized();
-        return fullSchema;
+        initPartitionColumns();
+        return partitionColumns.stream().map(c -> 
c.getType()).collect(Collectors.toList());
     }
 
-    @Override
-    public List<Column> getBaseSchema() {
-        return getFullSchema();
+    public List<Column> getPartitionColumns() {
+        makeSureInitialized();
+        initPartitionColumns();
+        return partitionColumns;
     }
 
-    @Override
-    public List<Column> getBaseSchema(boolean full) {
-        return getFullSchema();
+    public List<String> getPartitionColumnNames() {
+        return getPartitionColumns().stream().map(c -> 
c.getName()).collect(Collectors.toList());
     }
 
     @Override
-    public Column getColumn(String name) {
-        makeSureInitialized();
-        for (Column column : fullSchema) {
-            if (name.equals(column.getName())) {
-                return column;
-            }
-        }
-        return null;
+    public boolean isView() {
+        return remoteTable.isSetViewOriginalText() || 
remoteTable.isSetViewExpandedText();
     }
 
     @Override
@@ -321,8 +249,9 @@ public class HMSExternalTable extends ExternalTable {
 
     @Override
     public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
         THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
-        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, fullSchema.size(), 0,
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
                 getName(), dbName);
         tTableDescriptor.setHiveTable(tHiveTable);
         return tTableDescriptor;
@@ -340,12 +269,29 @@ public class HMSExternalTable extends ExternalTable {
         return catalog.getCatalogProperty().getS3Properties();
     }
 
-    public List<Partition> getHivePartitions(ExprNodeGenericFuncDesc 
hivePartitionPredicate) throws DdlException {
-        List<Partition> hivePartitions = Lists.newArrayList();
-        PooledHiveMetaStoreClient client = ((HMSExternalCatalog) 
catalog).getClient();
-        client.listPartitionsByExpr(remoteTable.getDbName(), 
remoteTable.getTableName(),
-                
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), 
hivePartitions);
-        return hivePartitions;
+    private void initPartitionColumns() {
+        if (partitionColumns != null) {
+            return;
+        }
+        synchronized (this) {
+            if (partitionColumns != null) {
+                return;
+            }
+            Set<String> partitionKeys = 
remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
+                    .collect(Collectors.toSet());
+            partitionColumns = 
Lists.newArrayListWithCapacity(partitionKeys.size());
+            for (String partitionKey : partitionKeys) {
+                // Do not use "getColumn()", which will cause dead loop
+                List<Column> schema = getFullSchema();
+                for (Column column : schema) {
+                    if (partitionKey.equals(column.getName())) {
+                        partitionColumns.add(column);
+                        break;
+                    }
+                }
+            }
+            LOG.debug("get {} partition columns for table: {}", 
partitionColumns.size(), name);
+        }
     }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e22913fe52..f0c02c879c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1888,5 +1888,33 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static long max_replica_count_when_schema_change = 100000;
+
+    /**
+     * Max cache num of hive partition.
+     * Decrease this value if FE's memory is small
+     */
+    @ConfField(mutable = false, masterOnly = false)
+    public static long max_hive_partition_cache_num = 100000;
+
+    /**
+     * Max cache num of external catalog's file
+     * Decrease this value if FE's memory is small
+     */
+    @ConfField(mutable = false, masterOnly = false)
+    public static long max_external_file_cache_num = 100000;
+
+    /**
+     * Max cache num of external table's schema
+     * Decrease this value if FE's memory is small
+     */
+    @ConfField(mutable = false, masterOnly = false)
+    public static long max_external_schema_cache_num = 10000;
+
+    /**
+     * The expiration time of a cache object after last access of it.
+     * For external schema cache and hive meta cache.
+     */
+    @ConfField(mutable = false, masterOnly = false)
+    public static long external_cache_expire_time_minutes_after_access = 24 * 
60; // 1 day
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java
new file mode 100644
index 0000000000..bbb5ec80dc
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CacheException.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+public class CacheException extends RuntimeException {
+    public CacheException(String format, Throwable cause, Object... msg) {
+        super(String.format(format, msg), cause);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index e1687b0f7f..39ec822f6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -95,6 +95,7 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         CatalogIf catalog = idToCatalog.remove(catalogId);
         if (catalog != null) {
             nameToCatalog.remove(catalog.getName());
+            
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getName());
         }
         return catalog;
     }
@@ -423,16 +424,6 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         db.replayInitDb(log, catalog);
     }
 
-    public void replayInitExternalTable(InitTableLog log) {
-        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-        Preconditions.checkArgument(catalog != null);
-        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-        Preconditions.checkArgument(db != null);
-        ExternalTable table = db.getTableForReplay(log.getTableId());
-        Preconditions.checkArgument(table != null);
-        table.replayInitTable(log.getSchema());
-    }
-
     public void replayRefreshExternalDb(ExternalObjectLog log) {
         writeLock();
         try {
@@ -445,15 +436,11 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
     }
 
     public void replayRefreshExternalTable(ExternalObjectLog log) {
-        writeLock();
-        try {
-            ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
-            ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
-            ExternalTable table = db.getTableForReplay(log.getTableId());
-            table.setUnInitialized();
-        } finally {
-            writeUnlock();
-        }
+        ExternalCatalog catalog = (ExternalCatalog) 
idToCatalog.get(log.getCatalogId());
+        ExternalDatabase db = catalog.getDbForReplay(log.getDbId());
+        ExternalTable table = db.getTableForReplay(log.getTableId());
+        Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog)
+                .invalidateCache(db.getFullName(), table.getName());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
index 29123395ed..c5a410eb3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/EsExternalCatalog.java
@@ -18,6 +18,7 @@
 package org.apache.doris.datasource;
 
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
@@ -190,4 +191,10 @@ public class EsExternalCatalog extends ExternalCatalog {
         super.gsonPostProcess();
         setProperties(this.catalogProperty.getProperties());
     }
+
+    @Override
+    public List<Column> getSchema(String dbName, String tblName) {
+        makeSureInitialized();
+        return EsUtil.genColumnsFromEs(getEsRestClient(), tblName, null);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index ff82a4dda0..0663fed2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.EsExternalDatabase;
 import org.apache.doris.catalog.external.ExternalDatabase;
@@ -62,14 +63,14 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
     protected CatalogProperty catalogProperty = new CatalogProperty();
     @SerializedName(value = "initialized")
     private boolean initialized = false;
-
-    // Cache of db name to db id
     @SerializedName(value = "idToDb")
     protected Map<Long, ExternalDatabase> idToDb = Maps.newConcurrentMap();
     // db name does not contains "default_cluster"
     protected Map<String, Long> dbNameToId = Maps.newConcurrentMap();
     private boolean objectCreated = false;
 
+    private ExternalSchemaCache schemaCache;
+
     /**
      * @return names of database in this catalog.
      */
@@ -135,6 +136,8 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
         throw new NotImplementedException();
     }
 
+    public abstract List<Column> getSchema(String dbName, String tblName);
+
     @Override
     public long getId() {
         return id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
new file mode 100644
index 0000000000..eb9123f814
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/**
+ * Cache meta of external catalog
+ * 1. Meta for hive meta store, mainly for partition.
+ * 2. Table Schema cahce.
+ */
+public class ExternalMetaCacheMgr {
+    private static final Logger LOG = 
LogManager.getLogger(ExternalMetaCacheMgr.class);
+
+    // catalog id -> HiveMetaStoreCache
+    private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
+    // catalog id -> table schema cache
+    private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
+    private Executor executor;
+
+    public ExternalMetaCacheMgr() {
+        executor = ThreadPoolManager.newDaemonCacheThreadPool(10, 
"ExternalMetaCacheMgr", false);
+    }
+
+    public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
+        HiveMetaStoreCache cache = cacheMap.get(catalog.getId());
+        if (cache == null) {
+            synchronized (cacheMap) {
+                if (!cacheMap.containsKey(catalog.getId())) {
+                    cacheMap.put(catalog.getId(), new 
HiveMetaStoreCache(catalog, executor));
+                }
+                cache = cacheMap.get(catalog.getId());
+            }
+        }
+        return cache;
+    }
+
+    public ExternalSchemaCache getSchemaCache(ExternalCatalog catalog) {
+        ExternalSchemaCache cache = schemaCacheMap.get(catalog.getId());
+        if (cache == null) {
+            synchronized (schemaCacheMap) {
+                if (!schemaCacheMap.containsKey(catalog.getId())) {
+                    schemaCacheMap.put(catalog.getId(), new 
ExternalSchemaCache(catalog, executor));
+                }
+                cache = schemaCacheMap.get(catalog.getId());
+            }
+        }
+        return cache;
+    }
+
+    public void removeCache(String catalogId) {
+        if (cacheMap.remove(catalogId) != null) {
+            LOG.info("remove hive metastore cache for catalog {}" + catalogId);
+        }
+        if (schemaCacheMap.remove(catalogId) != null) {
+            LOG.info("remove schema cache for catalog {}" + catalogId);
+        }
+    }
+
+    public void removeCache(long catalogId, String dbName, String tblName) {
+        ExternalSchemaCache schemaCache = schemaCacheMap.get(catalogId);
+        if (schemaCache != null) {
+            schemaCache.invalidateCache(dbName, tblName);
+            LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, 
tblName, catalogId);
+        }
+        HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
+        if (metaCache != null) {
+            metaCache.invalidateCache(dbName, tblName);
+            LOG.debug("invalid meta cache for {}.{} in catalog {}", dbName, 
tblName, catalogId);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
new file mode 100644
index 0000000000..718525ae6a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.Config;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import lombok.Data;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+// The schema cache for external table
+public class ExternalSchemaCache {
+    private static final Logger LOG = 
LogManager.getLogger(ExternalSchemaCache.class);
+    private ExternalCatalog catalog;
+
+    private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
+
+    public ExternalSchemaCache(ExternalCatalog catalog, Executor executor) {
+        this.catalog = catalog;
+        init(executor);
+        initMetrics();
+    }
+
+    private void init(Executor executor) {
+        schemaCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_external_schema_cache_num)
+                
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+                .build(CacheLoader.asyncReloading(new 
CacheLoader<SchemaCacheKey, ImmutableList<Column>>() {
+                    @Override
+                    public ImmutableList<Column> load(SchemaCacheKey key) 
throws Exception {
+                        return loadSchema(key);
+                    }
+                }, executor));
+    }
+
+    private void initMetrics() {
+        // schema cache
+        GaugeMetric<Long> schemaCacheGauge = new 
GaugeMetric<Long>("external_schema_cache",
+                Metric.MetricUnit.NOUNIT, "external schema cache number") {
+            @Override
+            public Long getValue() {
+                return schemaCache.size();
+            }
+        };
+        schemaCacheGauge.addLabel(new MetricLabel("catalog", 
catalog.getName()));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge);
+    }
+
+    private ImmutableList<Column> loadSchema(SchemaCacheKey key) {
+        ImmutableList<Column> schema = 
ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("load schema for {} in catalog {}", key, 
catalog.getName());
+        }
+        return schema;
+    }
+
+    public List<Column> getSchema(String dbName, String tblName) {
+        SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+        try {
+            return schemaCache.get(key);
+        } catch (ExecutionException e) {
+            throw new CacheException("failed to get schema for %s in catalog 
%s", e, key, catalog.getName());
+        }
+    }
+
+    public void invalidateCache(String dbName, String tblName) {
+        SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+        schemaCache.invalidate(key);
+    }
+
+    @Data
+    public static class SchemaCacheKey {
+        private String dbName;
+        private String tblName;
+
+        public SchemaCacheKey(String dbName, String tblName) {
+            this.dbName = dbName;
+            this.tblName = tblName;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof SchemaCacheKey)) {
+                return false;
+            }
+            return dbName.equals(((SchemaCacheKey) obj).dbName) && 
tblName.equals(((SchemaCacheKey) obj).tblName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbName, tblName);
+        }
+
+        @Override
+        public String toString() {
+            return "SchemaCacheKey{" + "dbName='" + dbName + '\'' + ", 
tblName='" + tblName + '\'' + '}';
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java
new file mode 100644
index 0000000000..fa2652b867
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSClientException.java
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+public class HMSClientException extends RuntimeException {
+    public HMSClientException(String format, Throwable cause, Object... msg) {
+        super(String.format(format, msg), cause);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index f348e93787..fdb2ef2981 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.external.ExternalDatabase;
@@ -26,6 +27,7 @@ import org.apache.doris.cluster.ClusterNamespace;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.Nullable;
@@ -159,4 +161,17 @@ public class HMSExternalCatalog extends ExternalCatalog {
         makeSureInitialized();
         return client;
     }
+
+    @Override
+    public List<Column> getSchema(String dbName, String tblName) {
+        makeSureInitialized();
+        List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
+        List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
+        for (FieldSchema field : schema) {
+            tmpSchema.add(new Column(field.getName(),
+                    
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+                    true, null, field.getComment(), true, null, -1));
+        }
+        return tmpSchema;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
index c2f3567f56..be7e54eba1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java
@@ -40,13 +40,15 @@ import java.util.List;
 import java.util.Queue;
 
 /**
- * A hive metastore client pool for a specific hive conf.
+ * A hive metastore client pool for a specific catalog with hive configuration.
  */
 public class PooledHiveMetaStoreClient {
     private static final Logger LOG = 
LogManager.getLogger(PooledHiveMetaStoreClient.class);
 
-    private Queue<CachedClient> clientPool = new LinkedList<>();
     private static final HiveMetaHookLoader DUMMY_HOOK_LOADER = t -> null;
+    private static final short MAX_LIST_PARTITION_NUM = 10000;
+
+    private Queue<CachedClient> clientPool = new LinkedList<>();
     private final int poolSize;
     private final HiveConf hiveConf;
 
@@ -62,7 +64,7 @@ public class PooledHiveMetaStoreClient {
         try (CachedClient client = getClient()) {
             return client.client.getAllDatabases();
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to get all database from hms 
client", e);
         }
     }
 
@@ -70,7 +72,7 @@ public class PooledHiveMetaStoreClient {
         try (CachedClient client = getClient()) {
             return client.client.getAllTables(dbName);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to get all tables for db %s", 
e, dbName);
         }
     }
 
@@ -78,17 +80,33 @@ public class PooledHiveMetaStoreClient {
         try (CachedClient client = getClient()) {
             return client.client.tableExists(dbName, tblName);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to check if table %s in db %s 
exists", e, tblName, dbName);
+        }
+    }
+
+    public List<String> listPartitionNames(String dbName, String tblName) {
+        try (CachedClient client = getClient()) {
+            return client.client.listPartitionNames(dbName, tblName, 
MAX_LIST_PARTITION_NUM);
+        } catch (Exception e) {
+            throw new HMSClientException("failed to list partition names for 
table %s in db %s", e, tblName, dbName);
+        }
+    }
+
+    public Partition getPartition(String dbName, String tblName, List<String> 
partitionValues) {
+        try (CachedClient client = getClient()) {
+            return client.client.getPartition(dbName, tblName, 
partitionValues);
+        } catch (Exception e) {
+            throw new HMSClientException("failed to get partition for table %s 
in db %s with value %s", e, tblName,
+                    dbName, partitionValues);
         }
     }
 
-    public boolean listPartitionsByExpr(String dbName, String tblName,
-            byte[] partitionPredicatesInBytes, List<Partition> hivePartitions) 
{
+    public List<Partition> getPartitionsByFilter(String dbName, String 
tblName, String filter) {
         try (CachedClient client = getClient()) {
-            return client.client.listPartitionsByExpr(dbName, tblName, 
partitionPredicatesInBytes,
-                    null, (short) -1, hivePartitions);
+            return client.client.listPartitionsByFilter(dbName, tblName, 
filter, MAX_LIST_PARTITION_NUM);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to get partition by filter 
for table %s in db %s", e, tblName,
+                    dbName);
         }
     }
 
@@ -96,7 +114,7 @@ public class PooledHiveMetaStoreClient {
         try (CachedClient client = getClient()) {
             return client.client.getTable(dbName, tblName);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to get table %s in db %s from 
hms client", e, tblName, dbName);
         }
     }
 
@@ -104,7 +122,7 @@ public class PooledHiveMetaStoreClient {
         try (CachedClient client = getClient()) {
             return client.client.getSchema(dbName, tblName);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new HMSClientException("failed to get schema for table %s in 
db %s", e, tblName, dbName);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
new file mode 100644
index 0000000000..388c847559
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import lombok.Data;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+// The cache of a hms catalog. 3 kind of caches:
+// 1. partitionValuesCache: cache the partition values of a table, for 
partition prune.
+// 2. partitionCache: cache the partition info(location, input format, etc.) 
of a table.
+// 3. fileCache: cache the files of a location.
+public class HiveMetaStoreCache {
+    private static final Logger LOG = 
LogManager.getLogger(HiveMetaStoreCache.class);
+    private static final int MIN_BATCH_FETCH_PARTITION_NUM = 50;
+
+    private HMSExternalCatalog catalog;
+
+    // cache from <dbname-tblname> -> <values of partitions>
+    private LoadingCache<PartitionValueCacheKey, 
ImmutableList<ListPartitionItem>> partitionValuesCache;
+    // cache from <dbname-tblname-partition_values> -> <partition info>
+    private LoadingCache<PartitionCacheKey, HivePartition> partitionCache;
+    // cache from <location> -> <file list>
+    private LoadingCache<FileCacheKey, ImmutableList<InputSplit>> fileCache;
+
+    public HiveMetaStoreCache(HMSExternalCatalog catalog, Executor executor) {
+        this.catalog = catalog;
+        init(executor);
+        initMetrics();
+    }
+
+    private void init(Executor executor) {
+        partitionValuesCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
+                
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+                .build(CacheLoader.asyncReloading(
+                        new CacheLoader<PartitionValueCacheKey, 
ImmutableList<ListPartitionItem>>() {
+                            @Override
+                            public ImmutableList<ListPartitionItem> 
load(PartitionValueCacheKey key) throws Exception {
+                                return loadPartitionValues(key);
+                            }
+                        }, executor));
+
+        partitionCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
+                
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+                .build(CacheLoader.asyncReloading(new 
CacheLoader<PartitionCacheKey, HivePartition>() {
+                    @Override
+                    public HivePartition load(PartitionCacheKey key) throws 
Exception {
+                        return loadPartitions(key);
+                    }
+                }, executor));
+
+        fileCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_external_file_cache_num)
+                
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+                .build(CacheLoader.asyncReloading(new 
CacheLoader<FileCacheKey, ImmutableList<InputSplit>>() {
+                    @Override
+                    public ImmutableList<InputSplit> load(FileCacheKey key) 
throws Exception {
+                        return loadFiles(key);
+                    }
+                }, executor));
+    }
+
+    private void initMetrics() {
+        // partition value
+        GaugeMetric<Long> valueCacheGauge = new 
GaugeMetric<Long>("hive_meta_cache",
+                Metric.MetricUnit.NOUNIT, "hive partition value cache number") 
{
+            @Override
+            public Long getValue() {
+                return partitionValuesCache.size();
+            }
+        };
+        valueCacheGauge.addLabel(new MetricLabel("type", "partition_value"));
+        valueCacheGauge.addLabel(new MetricLabel("catalog", 
catalog.getName()));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(valueCacheGauge);
+        // partition
+        GaugeMetric<Long> partitionCacheGauge = new 
GaugeMetric<Long>("hive_meta_cache",
+                Metric.MetricUnit.NOUNIT, "hive partition cache number") {
+            @Override
+            public Long getValue() {
+                return partitionCache.size();
+            }
+        };
+        partitionCacheGauge.addLabel(new MetricLabel("type", "partition"));
+        partitionCacheGauge.addLabel(new MetricLabel("catalog", 
catalog.getName()));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(partitionCacheGauge);
+        // file
+        GaugeMetric<Long> fileCacheGauge = new 
GaugeMetric<Long>("hive_meta_cache",
+                Metric.MetricUnit.NOUNIT, "hive file cache number") {
+            @Override
+            public Long getValue() {
+                return fileCache.size();
+            }
+        };
+        fileCacheGauge.addLabel(new MetricLabel("type", "file"));
+        fileCacheGauge.addLabel(new MetricLabel("catalog", catalog.getName()));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(fileCacheGauge);
+    }
+
+    private ImmutableList<ListPartitionItem> 
loadPartitionValues(PartitionValueCacheKey key) {
+        // partition name format: nation=cn/city=beijing
+        List<String> partitionNames = 
catalog.getClient().listPartitionNames(key.dbName, key.tblName);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("load #{} partitions for {} in catalog {}", 
partitionNames.size(), key, catalog.getName());
+        }
+        List<ListPartitionItem> partitionValues = 
Lists.newArrayListWithExpectedSize(partitionNames.size());
+        for (String partitionName : partitionNames) {
+            partitionValues.add(toListPartitionItem(partitionName, key.types));
+        }
+        return ImmutableList.copyOf(partitionValues);
+    }
+
+    private ListPartitionItem toListPartitionItem(String partitionName, 
List<Type> types) {
+        // Partition name will be in format: nation=cn/city=beijing
+        // parse it to get values "cn" and "beijing"
+        String[] parts = partitionName.split("/");
+        Preconditions.checkState(parts.length == types.size(), partitionName + 
" vs. " + types);
+        List<PartitionValue> values = 
Lists.newArrayListWithExpectedSize(types.size());
+        for (String part : parts) {
+            String[] kv = part.split("=");
+            Preconditions.checkState(kv.length == 2, partitionName);
+            values.add(new PartitionValue(kv[1]));
+        }
+        try {
+            PartitionKey key = 
PartitionKey.createListPartitionKeyWithTypes(values, types);
+            return new ListPartitionItem(Lists.newArrayList(key));
+        } catch (AnalysisException e) {
+            throw new CacheException("failed to convert hive partition %s to 
list partition in catalog %s",
+                    e, partitionName, catalog.getName());
+        }
+    }
+
+    private HivePartition loadPartitions(PartitionCacheKey key) {
+        Partition partition = catalog.getClient().getPartition(key.dbName, 
key.tblName, key.values);
+        StorageDescriptor sd = partition.getSd();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("load partition format: {}, location: {} for {} in 
catalog {}",
+                    sd.getInputFormat(), sd.getLocation(), key, 
catalog.getName());
+        }
+        // TODO: more info?
+        return new HivePartition(sd.getInputFormat(), sd.getLocation(), 
key.values);
+    }
+
+    private ImmutableList<InputSplit> loadFiles(FileCacheKey key) {
+        String finalLocation = convertToS3IfNecessary(key.location);
+        Configuration conf = getConfiguration();
+        JobConf jobConf = new JobConf(conf);
+        // For Tez engine, it may generate subdirectories for "union" query.
+        // So there may be files and directories in the table directory at the 
same time. eg:
+        //      /user/hive/warehouse/region_tmp_union_all2/000000_0
+        //      /user/hive/warehouse/region_tmp_union_all2/1
+        //      /user/hive/warehouse/region_tmp_union_all2/2
+        // So we need to set this config to support visit dir recursively.
+        // Otherwise, getSplits() may throw exception: "Not a file xxx"
+        // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
+        jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
+        FileInputFormat.setInputPaths(jobConf, finalLocation);
+        try {
+            InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(conf, 
key.inputFormat, false);
+            InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("load #{} files for {} in catalog {}", 
splits.length, key, catalog.getName());
+            }
+            return ImmutableList.copyOf(splits);
+        } catch (Exception e) {
+            throw new CacheException("failed to get input splits for %s in 
catalog %s", e, key, catalog.getName());
+        }
+    }
+
+    // convert oss:// to s3://
+    private String convertToS3IfNecessary(String location) {
+        LOG.debug("try convert location to s3 prefix: " + location);
+        if (location.startsWith(FeConstants.FS_PREFIX_COS)
+                || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                || location.startsWith(FeConstants.FS_PREFIX_BOS)
+                || location.startsWith(FeConstants.FS_PREFIX_OSS)
+                || location.startsWith(FeConstants.FS_PREFIX_S3A)
+                || location.startsWith(FeConstants.FS_PREFIX_S3N)) {
+            int pos = location.indexOf("://");
+            if (pos == -1) {
+                throw new RuntimeException("No '://' found in location: " + 
location);
+            }
+            return "s3" + location.substring(pos);
+        }
+        return location;
+    }
+
+    private Configuration getConfiguration() {
+        Configuration configuration = new HdfsConfiguration();
+        for (Map.Entry<String, String> entry : 
catalog.getCatalogProperty().getProperties().entrySet()) {
+            configuration.set(entry.getKey(), entry.getValue());
+        }
+        Map<String, String> s3Properties = 
catalog.getCatalogProperty().getS3Properties();
+        for (Map.Entry<String, String> entry : s3Properties.entrySet()) {
+            configuration.set(entry.getKey(), entry.getValue());
+        }
+        return configuration;
+    }
+
+    public ImmutableList<ListPartitionItem> getPartitionValues(String dbName, 
String tblName, List<Type> types) {
+        PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, types);
+        try {
+            return partitionValuesCache.get(key);
+        } catch (ExecutionException e) {
+            throw new CacheException("failed to get partition values for %s in 
catalog %s", e, key, catalog.getName());
+        }
+    }
+
+    public List<InputSplit> getFilesByPartitions(List<HivePartition> 
partitions) {
+        long start = System.currentTimeMillis();
+        List<FileCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitions.size());
+        partitions.stream().forEach(p -> keys.add(new 
FileCacheKey(p.getPath(), p.getInputFormat())));
+
+        Stream<FileCacheKey> stream;
+        if (partitions.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
+            stream = keys.stream();
+        } else {
+            stream = keys.parallelStream();
+        }
+        List<ImmutableList<InputSplit>> fileLists = stream.map(k -> {
+            try {
+                return fileCache.get(k);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }).collect(Collectors.toList());
+        List<InputSplit> retFiles = Lists.newArrayListWithExpectedSize(
+                fileLists.stream().mapToInt(l -> l.size()).sum());
+        fileLists.stream().forEach(l -> retFiles.addAll(l));
+        LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} 
ms",
+                retFiles.size(), partitions.size(), catalog.getName(), 
(System.currentTimeMillis() - start));
+        return retFiles;
+    }
+
+    public List<HivePartition> getAllPartitions(String dbName, String name, 
List<List<String>> partitionValuesList) {
+        long start = System.currentTimeMillis();
+        List<PartitionCacheKey> keys = 
Lists.newArrayListWithExpectedSize(partitionValuesList.size());
+        partitionValuesList.stream().forEach(p -> keys.add(new 
PartitionCacheKey(dbName, name, p)));
+
+        Stream<PartitionCacheKey> stream;
+        if (partitionValuesList.size() < MIN_BATCH_FETCH_PARTITION_NUM) {
+            stream = keys.stream();
+        } else {
+            stream = keys.parallelStream();
+        }
+        List<HivePartition> partitions = stream.map(k -> {
+            try {
+                return partitionCache.get(k);
+            } catch (ExecutionException e) {
+                throw new CacheException("failed to get partition for %s in 
catalog %s", e, k, catalog.getName());
+            }
+        }).collect(Collectors.toList());
+        LOG.debug("get #{} partitions in catalog {} cost: {} ms", 
partitions.size(), catalog.getName(),
+                (System.currentTimeMillis() - start));
+        return partitions;
+    }
+
+    public void invalidateCache(String dbName, String tblName) {
+        PartitionValueCacheKey key = new PartitionValueCacheKey(dbName, 
tblName, null);
+        partitionValuesCache.invalidate(key);
+        // TODO: find a way to invalidate partitionCache and fileCache
+    }
+
+    /**
+     * The Key of hive partition value cache
+     */
+    @Data
+    public static class PartitionValueCacheKey {
+        private String dbName;
+        private String tblName;
+        // not in key
+        private List<Type> types;
+
+        public PartitionValueCacheKey(String dbName, String tblName, 
List<Type> types) {
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.types = types;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof PartitionValueCacheKey)) {
+                return false;
+            }
+            return dbName.equals(((PartitionValueCacheKey) obj).dbName)
+                    && tblName.equals(((PartitionValueCacheKey) obj).tblName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbName, tblName);
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionValueCacheKey{" + "dbName='" + dbName + '\'' + ", 
tblName='" + tblName + '\'' + '}';
+        }
+    }
+
+    @Data
+    public static class PartitionCacheKey {
+        private String dbName;
+        private String tblName;
+        private List<String> values;
+
+        public PartitionCacheKey(String dbName, String tblName, List<String> 
values) {
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.values = values;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof PartitionCacheKey)) {
+                return false;
+            }
+            return dbName.equals(((PartitionCacheKey) obj).dbName)
+                    && tblName.equals(((PartitionCacheKey) obj).tblName)
+                    && Objects.equals(values, ((PartitionCacheKey) 
obj).values);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbName, tblName, values);
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionCacheKey{" + "dbName='" + dbName + '\'' + ", 
tblName='" + tblName + '\'' + ", values="
+                    + values + '}';
+        }
+    }
+
+    @Data
+    public static class FileCacheKey {
+        private String location;
+        // not in key
+        private String inputFormat;
+
+        public FileCacheKey(String location, String inputFormat) {
+            this.location = location;
+            this.inputFormat = inputFormat;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileCacheKey)) {
+                return false;
+            }
+            return location.equals(((FileCacheKey) obj).location);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(location);
+        }
+
+        @Override
+        public String toString() {
+            return "FileCacheKey{" + "location='" + location + '\'' + ", 
inputFormat='" + inputFormat + '\'' + '}';
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
new file mode 100644
index 0000000000..e5b5178e75
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class HivePartition {
+    private String inputFormat;
+    private String path;
+    private List<String> partitionValues;
+
+    public HivePartition(String inputFormat, String path, List<String> 
partitionValues) {
+        // eg: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
+        this.inputFormat = inputFormat;
+        // eg: 
hdfs://hk-dev01:8121/user/doris/parquet/partition_table/nation=cn/city=beijing
+        this.path = path;
+        // eg: cn, beijing
+        this.partitionValues = partitionValues;
+    }
+
+    @Override
+    public String toString() {
+        return "HivePartition{"
+                + "inputFormat='" + inputFormat + '\''
+                + ", path='" + path + '\''
+                + ", partitionValues=" + partitionValues + '}';
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 39bd5e7dfd..c93ea8d14a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -45,7 +45,6 @@ import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.InitDatabaseLog;
-import org.apache.doris.datasource.InitTableLog;
 import org.apache.doris.ha.MasterInfo;
 import org.apache.doris.journal.Journal;
 import org.apache.doris.journal.JournalCursor;
@@ -951,8 +950,7 @@ public class EditLog {
                     break;
                 }
                 case OperationType.OP_INIT_EXTERNAL_TABLE: {
-                    final InitTableLog log = (InitTableLog) journal.getData();
-                    env.getCatalogMgr().replayInitExternalTable(log);
+                    // Do nothing.
                     break;
                 }
                 default: {
@@ -1626,10 +1624,6 @@ public class EditLog {
         logEdit(OperationType.OP_REFRESH_EXTERNAL_TABLE, log);
     }
 
-    public void logInitExternalTable(InitTableLog log) {
-        logEdit(OperationType.OP_INIT_EXTERNAL_TABLE, log);
-    }
-
     public Journal getJournal() {
         return this.journal;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 82304acb23..6204fc1836 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -243,6 +243,7 @@ public class OperationType {
     public static final short OP_REFRESH_EXTERNAL_DB = 326;
     public static final short OP_INIT_EXTERNAL_DB = 327;
     public static final short OP_REFRESH_EXTERNAL_TABLE = 328;
+    @Deprecated
     public static final short OP_INIT_EXTERNAL_TABLE = 329;
 
     // scheduler job and task 330-350
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index c522823622..8523830c66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -141,7 +141,6 @@ public abstract class ScanNode extends PlanNode {
                     columnNameToRange.put(column.getName(), columnRange);
                 }
             }
-
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 45a7b21747..0cf6661846 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -164,6 +164,8 @@ public class ExternalFileScanNode extends ExternalScanNode {
 
         switch (type) {
             case QUERY:
+                // prepare for partition prune
+                computeColumnFilter();
                 if (this.desc.getTable() instanceof HMSExternalTable) {
                     HMSExternalTable hmsTable = (HMSExternalTable) 
this.desc.getTable();
                     initHMSExternalTable(hmsTable);
@@ -199,13 +201,13 @@ public class ExternalFileScanNode extends 
ExternalScanNode {
         FileScanProviderIf scanProvider;
         switch (hmsTable.getDlaType()) {
             case HUDI:
-                scanProvider = new HudiScanProvider(hmsTable, desc);
+                scanProvider = new HudiScanProvider(hmsTable, desc, 
columnNameToRange);
                 break;
             case ICEBERG:
-                scanProvider = new IcebergScanProvider(hmsTable, desc);
+                scanProvider = new IcebergScanProvider(hmsTable, desc, 
columnNameToRange);
                 break;
             case HIVE:
-                scanProvider = new HiveScanProvider(hmsTable, desc);
+                scanProvider = new HiveScanProvider(hmsTable, desc, 
columnNameToRange);
                 break;
             default:
                 throw new UserException("Unknown table type: " + 
hmsTable.getDlaType());
@@ -511,9 +513,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
 
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
-        StringBuilder output = new StringBuilder(prefix);
-        // output.append(fileTable.getExplainString(prefix));
-
+        StringBuilder output = new StringBuilder();
         if (!conjuncts.isEmpty()) {
             output.append(prefix).append("predicates: 
").append(getExplainString(conjuncts)).append("\n");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index a5e8a7aabc..17e6d3417f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -22,16 +22,23 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.HiveBucketUtil;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
@@ -40,23 +47,20 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
 import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TFileType;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -75,9 +79,13 @@ public class HiveScanProvider extends HMSTableScanProvider {
 
     protected final TupleDescriptor desc;
 
-    public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) {
+    protected Map<String, ColumnRange> columnNameToRange;
+
+    public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+            Map<String, ColumnRange> columnNameToRange) {
         this.hmsTable = hmsTable;
         this.desc = desc;
+        this.columnNameToRange = columnNameToRange;
     }
 
     @Override
@@ -127,80 +135,69 @@ public class HiveScanProvider extends 
HMSTableScanProvider {
     }
 
     @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException {
-        // eg:
-        // oss://buckts/data_dir
-        // hdfs://hosts/data_dir
-        String location = getRemoteHiveTable().getSd().getLocation();
-        List<String> partitionKeys = 
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName)
-                .collect(Collectors.toList());
-        List<Partition> hivePartitions = new ArrayList<>();
-
-        if (partitionKeys.size() > 0) {
-            ExprNodeGenericFuncDesc hivePartitionPredicate = 
HiveMetaStoreClientHelper.convertToHivePartitionExpr(exprs,
-                    partitionKeys, hmsTable.getName());
-            
hivePartitions.addAll(hmsTable.getHivePartitions(hivePartitionPredicate));
-        }
+    public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
+        long start = System.currentTimeMillis();
+        try {
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
+            // 1. get ListPartitionItems from cache
+            ImmutableList<ListPartitionItem> partitionItems;
+            List<Type> partitionColumnTypes = 
hmsTable.getPartitionColumnTypes();
+            if (!partitionColumnTypes.isEmpty()) {
+                partitionItems = 
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
+                        partitionColumnTypes);
+            } else {
+                partitionItems = ImmutableList.of();
+            }
 
-        String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
+            List<InputSplit> allFiles = Lists.newArrayList();
+            if (!partitionItems.isEmpty()) {
+                // 2. prune partitions by expr
+                Map<Long, PartitionItem> keyItemMap = Maps.newHashMap();
+                long pid = 0;
+                for (ListPartitionItem partitionItem : partitionItems) {
+                    keyItemMap.put(pid++, partitionItem);
+                }
+                ListPartitionPrunerV2 pruner = new 
ListPartitionPrunerV2(keyItemMap,
+                        hmsTable.getPartitionColumns(), columnNameToRange);
+                Collection<Long> filteredPartitionIds = pruner.prune();
 
-        Configuration configuration = setConfiguration();
-        InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(configuration, 
inputFormatName, false);
-        List<InputSplit> splits;
-        if (!hivePartitions.isEmpty()) {
-            try {
-                splits = hivePartitions.stream().flatMap(x -> {
-                    try {
-                        return getSplitsByPath(inputFormat, configuration, 
x.getSd().getLocation()).stream();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                }).collect(Collectors.toList());
-            } catch (RuntimeException e) {
-                throw new IOException(e);
+                // 3. get partitions from cache
+                List<List<String>> partitionValuesList = 
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
+                for (Long id : filteredPartitionIds) {
+                    ListPartitionItem listPartitionItem = (ListPartitionItem) 
keyItemMap.get(id);
+                    
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
+                }
+                List<HivePartition> partitions = 
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
+                        partitionValuesList);
+                // 4. get all files of partitions
+                getFileSplitByPartitions(cache, partitions, allFiles);
+            } else {
+                // unpartitioned table, create a dummy partition to save 
location and inputformat,
+                // so that we can unify the interface.
+                HivePartition dummyPartition = new 
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
+                        hmsTable.getRemoteTable().getSd().getLocation(), null);
+                getFileSplitByPartitions(cache, 
Lists.newArrayList(dummyPartition), allFiles);
             }
-        } else {
-            splits = getSplitsByPath(inputFormat, configuration, location);
+            LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
+                    allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
+            return allFiles;
+        } catch (Throwable t) {
+            LOG.warn("get file split failed for table: {}", 
hmsTable.getName(), t);
+            throw new UserException("get file split failed for table: " + 
hmsTable.getName(), t);
         }
-        return HiveBucketUtil.getPrunedSplitsByBuckets(splits, 
hmsTable.getName(), exprs,
-                getRemoteHiveTable().getSd().getBucketCols(), 
getRemoteHiveTable().getSd().getNumBuckets(),
-                getRemoteHiveTable().getParameters());
-    }
-
-    private List<InputSplit> getSplitsByPath(InputFormat<?, ?> inputFormat, 
Configuration configuration,
-            String location) throws IOException {
-        String finalLocation = convertToS3IfNecessary(location);
-        JobConf jobConf = new JobConf(configuration);
-        // For Tez engine, it may generate subdirectoies for "union" query.
-        // So there may be files and directories in the table directory at the 
same time. eg:
-        //      /user/hive/warehouse/region_tmp_union_all2/000000_0
-        //      /user/hive/warehouse/region_tmp_union_all2/1
-        //      /user/hive/warehouse/region_tmp_union_all2/2
-        // So we need to set this config to support visit dir recursively.
-        // Otherwise, getSplits() may throw exception: "Not a file xxx"
-        // 
https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
-        jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", 
"true");
-        FileInputFormat.setInputPaths(jobConf, finalLocation);
-        InputSplit[] splits = inputFormat.getSplits(jobConf, 0);
-        return Lists.newArrayList(splits);
     }
 
-    // convert oss:// to s3://
-    private String convertToS3IfNecessary(String location) throws IOException {
-        LOG.debug("try convert location to s3 prefix: " + location);
-        if (location.startsWith(FeConstants.FS_PREFIX_COS)
-                || location.startsWith(FeConstants.FS_PREFIX_BOS)
-                || location.startsWith(FeConstants.FS_PREFIX_BOS)
-                || location.startsWith(FeConstants.FS_PREFIX_OSS)
-                || location.startsWith(FeConstants.FS_PREFIX_S3A)
-                || location.startsWith(FeConstants.FS_PREFIX_S3N)) {
-            int pos = location.indexOf("://");
-            if (pos == -1) {
-                throw new IOException("No '://' found in location: " + 
location);
-            }
-            return "s3" + location.substring(pos);
+    private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
+            List<InputSplit> allFiles) {
+        List<InputSplit> files = cache.getFilesByPartitions(partitions);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get #{} files from #{} partitions: {}: {}", 
files.size(), partitions.size(),
+                    Joiner.on(",")
+                            .join(files.stream().limit(10).map(f -> 
((FileSplit) f).getPath())
+                                    .collect(Collectors.toList())));
         }
-        return location;
+        allFiles.addAll(files);
     }
 
     protected Configuration setConfiguration() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
index d4c95b2549..59274ec521 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
@@ -21,10 +21,12 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileFormatType;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A file scan provider for hudi.
@@ -32,8 +34,9 @@ import java.util.List;
  */
 public class HudiScanProvider extends HiveScanProvider {
 
-    public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc) {
-        super(hmsTable, desc);
+    public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+            Map<String, ColumnRange> columnNameToRange) {
+        super(hmsTable, desc, columnNameToRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index f82e277f41..37a5266559 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileFormatType;
 
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,6 @@ import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,8 +48,9 @@ import java.util.Map;
  */
 public class IcebergScanProvider extends HiveScanProvider {
 
-    public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor 
desc) {
-        super(hmsTable, desc);
+    public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
+            Map<String, ColumnRange> columnNameToRange) {
+        super(hmsTable, desc, columnNameToRange);
     }
 
     @Override
@@ -69,7 +70,7 @@ public class IcebergScanProvider extends HiveScanProvider {
     }
 
     @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException {
+    public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
         List<Expression> expressions = new ArrayList<>();
         for (Expr conjunct : exprs) {
             Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index eae1829603..561b2de316 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -39,8 +39,8 @@ import org.apache.doris.thrift.TScanRangeLocations;
 import com.google.common.base.Joiner;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
@@ -56,6 +56,7 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
     @Override
     public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException 
{
+        long start = System.currentTimeMillis();
         try {
             List<InputSplit> inputSplits = getSplits(context.conjuncts);
             this.inputSplitNum = inputSplits.size();
@@ -100,10 +101,9 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
                 TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
 
                 
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-                LOG.info(
-                        "Assign to backend " + 
curLocations.getLocations().get(0).getBackendId() + " with table split: "
-                                + fileSplit.getPath() + " ( " + 
fileSplit.getStart() + "," + fileSplit.getLength() + ")"
-                                + " loaction: " + 
Joiner.on("|").join(split.getLocations()));
+                LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
+                        curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
+                        fileSplit.getLength(), 
Joiner.on("|").join(split.getLocations()));
 
                 fileSplitStrategy.update(fileSplit);
                 // Add a new location when it's can be split
@@ -117,6 +117,8 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
             if 
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
 > 0) {
                 scanRangeLocations.add(curLocations);
             }
+            LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+                    scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
         } catch (IOException e) {
             throw new UserException(e);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 58a8e10478..c22cb0c2d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -583,7 +583,7 @@ public class StmtExecutor implements ProfileWriter {
             throw e;
         } catch (UserException e) {
             // analysis exception only print message, not print the stack
-            LOG.warn("execute Exception. {}, {}", 
context.getQueryIdentifier(), e.getMessage(), e);
+            LOG.warn("execute Exception. {}, {}", 
context.getQueryIdentifier(), e.getMessage());
             context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
             context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
         } catch (Exception e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to