This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9eedda343b0993a7059abc36a0b8f207678fc04c
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Thu Jul 13 22:30:07 2023 +0800

    [feature](hudi) support hudi time travel in external table (#21739)
    
    Support hudi time travel in external table:
    ```
    select * from hudi_table for time as of '20230712221248';
    ```
    PR(https://github.com/apache/doris/pull/15418) supports to take timestamp 
or version as the snapshot ID in iceberg, but hudi only has timestamp as the 
snapshot ID. Therefore, when querying hudi table with `for version as of`, 
error will be thrown like:
    ```
    ERROR 1105 (HY000): errCode = 2, detailMessage = Hudi table only supports 
timestamp as snapshot ID
    ```
    The supported formats of timestamp in hudi are: 'yyyy-MM-dd HH:mm:ss[.SSS]' 
or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]', which is consistent with the 
[time-travel-query.](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)
    
    ## Partitioning Strategies
    Before this PR, hudi's partitions need to be synchronized to hive through 
[hive-sync-tool](https://hudi.apache.org/docs/syncing_metastore/#hive-sync-tool),
 or by setting very complex synchronization parameters in [spark 
conf](https://hudi.apache.org/docs/syncing_metastore/#sync-template). These 
processes are exceptionally complex and unnecessary, unless you want to query 
hudi data through hive.
    
    In addition, partitions are changed in time travel. We cannot guarantee the 
correctness of time travel through partition synchronization.
    
    So this PR directly obtain partitions by reading hudi meta information. 
Caching and updating table partition information through hudi instant 
timestamp, and reusing Doris' partition pruning.
---
 docs/en/docs/lakehouse/multi-catalog/hudi.md       |  22 +-
 docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md    |  22 +-
 fe/be-java-extensions/hudi-scanner/pom.xml         |   3 +-
 .../org/apache/doris/hudi/BaseSplitReader.scala    |   8 +-
 .../java/org/apache/doris/analysis/TableRef.java   |  32 ++-
 .../org/apache/doris/analysis/TableSnapshot.java   |   6 +-
 .../doris/catalog/HiveMetaStoreClientHelper.java   |  13 +-
 .../java/org/apache/doris/catalog/HudiUtils.java   |  34 +++
 .../java/org/apache/doris/common/ErrorCode.java    |   2 +-
 .../org/apache/doris/datasource/CatalogMgr.java    |   2 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |  21 +-
 .../doris/planner/external/FileQueryScanNode.java  |   7 +-
 .../planner/external/TablePartitionValues.java     | 255 +++++++++++++++++++++
 .../hudi/HudiCachedPartitionProcessor.java         | 131 +++++++++++
 .../planner/external/hudi/HudiPartitionMgr.java    |  86 +++++++
 .../external/hudi/HudiPartitionProcessor.java      | 124 ++++++++++
 .../doris/planner/external/hudi/HudiScanNode.java  |  84 ++++++-
 17 files changed, 809 insertions(+), 43 deletions(-)

diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md 
b/docs/en/docs/lakehouse/multi-catalog/hudi.md
index 890e7fd92c..4c46ccb0e1 100644
--- a/docs/en/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md
@@ -29,12 +29,12 @@ under the License.
 
 ## Usage
 
-1. Doris supports Snapshot Query on Copy-on-Write Hudi tables and Read 
Optimized Query / Snapshot on Merge-on-Read tables. In the future, it will 
support Incremental Query and Time Travel.
+1. The query types supported by the Hudi table are as follows, and the 
Incremental Query will be supported in the future.
 
 |  Table Type   | Supported Query types  |
 |  ----  | ----  |
-| Copy On Write  | Snapshot Query |
-| Merge On Read  | Snapshot Queries + Read Optimized Queries |
+| Copy On Write  | Snapshot Query + Time Travel |
+| Merge On Read  | Snapshot Queries + Read Optimized Queries + Time Travel |
 
 2. Doris supports Hive Metastore(Including catalogs compatible with Hive 
MetaStore, like [AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md)) Catalogs.
 
@@ -82,3 +82,19 @@ Users can view the perfomace of Java SDK through 
[profile](../../admin-manual/ht
 2. `JavaScanTime`: Time to read data by Java SDK
 3. `FillBlockTime`: Time co convert Java column data into C++ column data
 4. `GetRecordReaderTime`: Time to create and initialize Hudi Record Reader
+
+## Time Travel
+
+Supports reading snapshots specified in Hudi table.
+
+Every write operation to the Hudi table will generate a new snapshot.
+
+By default, query requests will only read the latest version of the snapshot.
+
+You can use the `FOR TIME AS OF` statement, based on the time of the snapshot 
to read historical version data. Examples are as follows:
+
+`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
+
+`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
+
+Hudi table does not support the `FOR VERSION AS OF` statement. Using this 
syntax to query the Hudi table will throw an error.
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
index d93c4268d8..228be87420 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
@@ -29,12 +29,12 @@ under the License.
 
 ## 使用限制
 
-1. 目前支持 Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Snapshot Queries 和 
Read Optimized Query。后续将支持 Incremental Query 和 Time Travel。
+1. Hudi 表支持的查询类型如下,后续将支持 Incremental Query。
 
 |  表类型   | 支持的查询类型  |
 |  ----  | ----  |
-| Copy On Write  | Snapshot Query |
-| Merge On Read  | Snapshot Queries + Read Optimized Queries |
+| Copy On Write  | Snapshot Query + Time Travel |
+| Merge On Read  | Snapshot Queries + Read Optimized Queries + Time Travel |
 
 2. 目前支持 Hive Metastore 和兼容 Hive Metastore 类型(例如[AWS Glue](./hive.md)/[Alibaba 
DLF](./dlf.md))的 Catalog。
 
@@ -83,3 +83,19 @@ Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java S
 2. `JavaScanTime`: Java SDK 读取数据的时间
 3. `FillBlockTime`: Java 数据拷贝为 C++ 数据的时间
 4. `GetRecordReaderTime`: 调用 Java SDK 并创建 Hudi Record Reader 的时间
+
+## Time Travel
+
+支持读取 Hudi 表指定的 Snapshot。
+
+每一次对 Hudi 表的写操作都会产生一个新的快照。
+
+默认情况下,查询请求只会读取最新版本的快照。
+
+可以使用 `FOR TIME AS OF` 
语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)和Hudi官网保持一致)读取历史版本的数据。示例如下:
+
+`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
+
+`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
+
+Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。
diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml 
b/fe/be-java-extensions/hudi-scanner/pom.xml
index 1b19da9887..b27c269c23 100644
--- a/fe/be-java-extensions/hudi-scanner/pom.xml
+++ b/fe/be-java-extensions/hudi-scanner/pom.xml
@@ -36,7 +36,6 @@ under the License.
         <sparkbundle.version>3.2</sparkbundle.version>
         <hudi.version>0.13.0</hudi.version>
         <janino.version>3.0.16</janino.version>
-        <fasterxml.jackson.version>2.14.3</fasterxml.jackson.version>
     </properties>
 
     <dependencies>
@@ -163,7 +162,7 @@ under the License.
             <!-- version of spark's jackson module is error -->
             <groupId>com.fasterxml.jackson.module</groupId>
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
-            <version>${fasterxml.jackson.version}</version>
+            <version>${jackson.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.guava</groupId>
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index e2ac89fab8..a4f67feddf 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -211,9 +211,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
 
   protected lazy val partitionColumns: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
 
-  protected lazy val specifiedQueryTimestamp: Option[String] =
-    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
-      .map(HoodieSqlCommonUtils.formatQueryInstant)
+  protected lazy val specifiedQueryTimestamp: Option[String] = 
Some(split.instantTime)
 
   private def queryTimestamp: Option[String] =
     
specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp))
@@ -624,9 +622,7 @@ object BaseSplitReader {
         // NOTE: We're including compaction here since it's not considering a 
"commit" operation
         val timeline = 
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants
 
-        val specifiedQueryTimestamp: Option[String] =
-          
split.optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
-            .map(HoodieSqlCommonUtils.formatQueryInstant)
+        val specifiedQueryTimestamp: Option[String] = Some(split.instantTime)
         val schemaResolver = new TableSchemaResolver(metaClient)
         val internalSchemaOpt = if 
(!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) {
           None
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index fcfbd39b44..730826fa2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -21,6 +21,7 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HudiUtils;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
@@ -537,15 +538,28 @@ public class TableRef implements ParseNode, Writable {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
         }
         HMSExternalTable extTable = (HMSExternalTable) this.getTable();
-        if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) {
-            
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
-        }
-        if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) {
-            String asOfTime = tableSnapshot.getTime();
-            Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime);
-            if (!matcher.matches()) {
-                throw new AnalysisException("Invalid datetime string: " + 
asOfTime);
-            }
+        switch (extTable.getDlaType()) {
+            case ICEBERG:
+                if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) 
{
+                    String asOfTime = tableSnapshot.getTime();
+                    Matcher matcher = 
TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime);
+                    if (!matcher.matches()) {
+                        throw new AnalysisException("Invalid datetime string: 
" + asOfTime);
+                    }
+                }
+                break;
+            case HUDI:
+                if (tableSnapshot.getType() == 
TableSnapshot.VersionType.VERSION) {
+                    throw new AnalysisException("Hudi table only supports 
timestamp as snapshot ID");
+                }
+                try {
+                    
tableSnapshot.setTime(HudiUtils.formatQueryInstant(tableSnapshot.getTime()));
+                } catch (Exception e) {
+                    throw new AnalysisException("Failed to parse hudi 
timestamp: " + e.getMessage(), e);
+                }
+                break;
+            default:
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
index e5d43b6d0b..0a851a9fd6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
@@ -19,7 +19,7 @@ package org.apache.doris.analysis;
 
 /**
  * Snapshot read for time travel
- * the version in 2022.12.28 just supports external iceberg table
+ * supports external iceberg/hudi table
  */
 public class TableSnapshot {
 
@@ -55,6 +55,10 @@ public class TableSnapshot {
         return time;
     }
 
+    public void setTime(String time) {
+        this.time = time;
+    }
+
     public long getVersion() {
         return version;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 232ce4f285..fbb3c71b29 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -922,10 +922,7 @@ public class HiveMetaStoreClientHelper {
         return hudiSchema;
     }
 
-    public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
-        String hudiBasePath = table.getRemoteTable().getSd().getLocation();
-
-        Configuration conf = getConfiguration(table);
+    public static UserGroupInformation getUserGroupInformation(Configuration 
conf) {
         UserGroupInformation ugi = null;
         String authentication = 
conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null);
         if (AuthType.KERBEROS.getDesc().equals(authentication)) {
@@ -945,6 +942,14 @@ public class HiveMetaStoreClientHelper {
                 ugi = UserGroupInformation.createRemoteUser(hadoopUserName);
             }
         }
+        return ugi;
+    }
+
+    public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) {
+        String hudiBasePath = table.getRemoteTable().getSd().getLocation();
+
+        Configuration conf = getConfiguration(table);
+        UserGroupInformation ugi = getUserGroupInformation(conf);
         HoodieTableMetaClient metaClient;
         if (ugi != null) {
             try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
index 0799f7b137..e52b52ab8e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java
@@ -22,12 +22,18 @@ import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
 public class HudiUtils {
+    private static final SimpleDateFormat defaultDateFormat = new 
SimpleDateFormat("yyyy-MM-dd");
+
     public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) {
         Schema.Type columnType = avroSchema.getType();
         LogicalType logicalType = avroSchema.getLogicalType();
@@ -166,4 +172,32 @@ public class HudiUtils {
         }
         return Type.UNSUPPORTED;
     }
+
+    /**
+     * Convert different query instant time format to the commit time format.
+     * Currently we support three kinds of instant time format for time travel 
query:
+     * 1、yyyy-MM-dd HH:mm:ss
+     * 2、yyyy-MM-dd
+     * This will convert to 'yyyyMMdd000000'.
+     * 3、yyyyMMddHHmmss
+     */
+    public static String formatQueryInstant(String queryInstant) throws 
ParseException {
+        int instantLength = queryInstant.length();
+        if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd 
HH:mm:ss[.SSS]
+            if (instantLength == 19) {
+                queryInstant += ".000";
+            }
+            return 
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
+        } else if (instantLength == 
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
+                || instantLength == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for 
yyyyMMddHHmmss[SSS]
+            HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // 
validate the format
+            return queryInstant;
+        } else if (instantLength == 10) { // for yyyy-MM-dd
+            return 
HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant));
+        } else {
+            throw new IllegalArgumentException("Unsupported query instant time 
format: " + queryInstant
+                    + ", Supported time format are: 'yyyy-MM-dd 
HH:mm:ss[.SSS]' "
+                    + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'");
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index f773bfd5ff..c827a41314 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1186,7 +1186,7 @@ public enum ErrorCode {
     ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'}, 
"Table name length exceeds limit, "
      + "the length of table name '%s' is %d which is greater than the 
configuration 'table_name_length_limit' (%d)."),
 
-    ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', 
'0'}, "Only iceberg external"
+    ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', 
'0'}, "Only iceberg/hudi external"
      + " table supports time travel in current version"),
 
     ERR_NONSSL_HANDSHAKE_RESPONSE(5091, new byte[] {'4', '2', '0', '0'},
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 96a7c1eae3..3ed74e260b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -128,7 +128,7 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
             catalog.onClose();
             nameToCatalog.remove(catalog.getName());
             lastDBOfCatalog.remove(catalog.getName());
-            
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getName());
+            
Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId());
             if (!Strings.isNullOrEmpty(catalog.getResource())) {
                 Resource catalogResource = 
Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource());
                 if (catalogResource != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index f5ca819c3b..16ffcc71f6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -23,6 +23,8 @@ import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.planner.external.hudi.HudiPartitionMgr;
+import org.apache.doris.planner.external.hudi.HudiPartitionProcessor;
 
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -41,9 +43,11 @@ public class ExternalMetaCacheMgr {
     private static final Logger LOG = 
LogManager.getLogger(ExternalMetaCacheMgr.class);
 
     // catalog id -> HiveMetaStoreCache
-    private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap();
+    private final Map<Long, HiveMetaStoreCache> cacheMap = 
Maps.newConcurrentMap();
     // catalog id -> table schema cache
     private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap();
+    // hudi partition manager
+    private final HudiPartitionMgr hudiPartitionMgr;
     private ExecutorService executor;
 
     public ExternalMetaCacheMgr() {
@@ -51,6 +55,7 @@ public class ExternalMetaCacheMgr {
                 Config.max_external_cache_loader_thread_pool_size,
                 Config.max_external_cache_loader_thread_pool_size * 1000,
                 "ExternalMetaCacheMgr", 120, true);
+        hudiPartitionMgr = HudiPartitionMgr.get(executor);
     }
 
     public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
@@ -79,13 +84,18 @@ public class ExternalMetaCacheMgr {
         return cache;
     }
 
-    public void removeCache(String catalogId) {
+    public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog 
catalog) {
+        return hudiPartitionMgr.getPartitionProcessor(catalog);
+    }
+
+    public void removeCache(long catalogId) {
         if (cacheMap.remove(catalogId) != null) {
-            LOG.info("remove hive metastore cache for catalog {}" + catalogId);
+            LOG.info("remove hive metastore cache for catalog {}", catalogId);
         }
         if (schemaCacheMap.remove(catalogId) != null) {
-            LOG.info("remove schema cache for catalog {}" + catalogId);
+            LOG.info("remove schema cache for catalog {}", catalogId);
         }
+        hudiPartitionMgr.removePartitionProcessor(catalogId);
     }
 
     public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
@@ -98,6 +108,7 @@ public class ExternalMetaCacheMgr {
         if (metaCache != null) {
             metaCache.invalidateTableCache(dbName, tblName);
         }
+        hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
         LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, 
tblName, catalogId);
     }
 
@@ -111,6 +122,7 @@ public class ExternalMetaCacheMgr {
         if (metaCache != null) {
             metaCache.invalidateDbCache(dbName);
         }
+        hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
         LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
     }
 
@@ -123,6 +135,7 @@ public class ExternalMetaCacheMgr {
         if (metaCache != null) {
             metaCache.invalidateAll();
         }
+        hudiPartitionMgr.cleanPartitionProcess(catalogId);
         LOG.debug("invalid catalog cache for {}", catalogId);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 7a7dd76ab7..09ed6361e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -252,9 +252,10 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
 
             // set hdfs params for hdfs file type.
             Map<String, String> locationProperties = getLocationProperties();
-            if (fileFormatType == TFileFormatType.FORMAT_JNI) {
+            if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType 
== TFileType.FILE_S3) {
                 scanRangeParams.setProperties(locationProperties);
-            } else if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
+            }
+            if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
                 String fsName = getFsName(fileSplit);
                 THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
                 tHdfsParams.setFsName(fsName);
@@ -267,8 +268,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                     }
                     scanRangeParams.addToBrokerAddresses(new 
TNetworkAddress(broker.host, broker.port));
                 }
-            } else if (locationType == TFileType.FILE_S3) {
-                scanRangeParams.setProperties(locationProperties);
             }
 
             TScanRangeLocations curLocations = newLocations(scanRangeParams);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
new file mode 100644
index 0000000000..bcc967501c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.planner.ColumnBound;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import lombok.Data;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+@Data
+public class TablePartitionValues {
+    public static final String HIVE_DEFAULT_PARTITION = 
"__HIVE_DEFAULT_PARTITION__";
+
+    private final ReadWriteLock readWriteLock;
+    private long lastUpdateTimestamp;
+    private long nextPartitionId;
+    private final Map<Long, PartitionItem> idToPartitionItem;
+    private final Map<String, Long> partitionNameToIdMap;
+    private final Map<Long, String> partitionIdToNameMap;
+
+    private Map<Long, List<UniqueId>> idToUniqueIdsMap;
+    private Map<Long, List<String>> partitionValuesMap;
+    //multi pair
+    private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange;
+    private Map<Range<PartitionKey>, UniqueId> rangeToId;
+    //single pair
+    private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap;
+    private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;
+
+    public TablePartitionValues() {
+        readWriteLock = new ReentrantReadWriteLock();
+        lastUpdateTimestamp = 0;
+        nextPartitionId = 0;
+        idToPartitionItem = new HashMap<>();
+        partitionNameToIdMap = new HashMap<>();
+        partitionIdToNameMap = new HashMap<>();
+    }
+
+    public TablePartitionValues(List<String> partitionNames, 
List<List<String>> partitionValues, List<Type> types) {
+        this();
+        addPartitions(partitionNames, partitionValues, types);
+    }
+
+    public TablePartitionValues(List<String> partitionNames, List<Type> types) 
{
+        this();
+        addPartitions(partitionNames, types);
+    }
+
+    public void addPartitions(List<String> partitionNames, List<List<String>> 
partitionValues, List<Type> types) {
+        Preconditions.checkState(partitionNames.size() == 
partitionValues.size());
+        List<String> addPartitionNames = new ArrayList<>();
+        List<PartitionItem> addPartitionItems = new ArrayList<>();
+        partitionNameToIdMap.forEach((partitionName, partitionId) -> {
+            addPartitionNames.add(partitionName);
+            addPartitionItems.add(idToPartitionItem.get(partitionId));
+        });
+
+        for (int i = 0; i < partitionNames.size(); i++) {
+            if (!partitionNameToIdMap.containsKey(partitionNames.get(i))) {
+                addPartitionNames.add(partitionNames.get(i));
+                
addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types));
+            }
+        }
+        cleanPartitions();
+
+        addPartitionItems(addPartitionNames, addPartitionItems, types);
+    }
+
+    public void addPartitions(List<String> partitionNames, List<Type> types) {
+        addPartitions(partitionNames,
+                
partitionNames.stream().map(this::getHivePartitionValues).collect(Collectors.toList()),
 types);
+    }
+
+    private void addPartitionItems(List<String> partitionNames, 
List<PartitionItem> partitionItems, List<Type> types) {
+        Preconditions.checkState(partitionNames.size() == 
partitionItems.size());
+        Preconditions.checkState(nextPartitionId == 0);
+        for (int i = 0; i < partitionNames.size(); i++) {
+            long partitionId = nextPartitionId++;
+            idToPartitionItem.put(partitionId, partitionItems.get(i));
+            partitionNameToIdMap.put(partitionNames.get(i), partitionId);
+            partitionIdToNameMap.put(partitionId, partitionNames.get(i));
+        }
+
+        // create a new map for partitionId <---> uniqueId
+        idToUniqueIdsMap = new HashMap<>();
+
+        if (types.size() > 1) {
+            // uidToPartitionRange and rangeToId are only used for 
multi-column partition
+            uidToPartitionRange = 
ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, 
idToUniqueIdsMap);
+            rangeToId = 
ListPartitionPrunerV2.genRangeToId(uidToPartitionRange);
+        } else {
+            Preconditions.checkState(types.size() == 1);
+            // singleColumnRangeMap is only used for single-column partition
+            singleColumnRangeMap = 
ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, 
idToUniqueIdsMap);
+            singleUidToColumnRangeMap = 
ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap);
+        }
+        partitionValuesMap = 
ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem);
+    }
+
+    public void dropPartitions(List<String> partitionNames, List<Type> types) {
+        partitionNames.forEach(p -> {
+            Long removedPartition = partitionNameToIdMap.get(p);
+            if (removedPartition != null) {
+                idToPartitionItem.remove(removedPartition);
+            }
+        });
+        List<String> remainingPartitionNames = new ArrayList<>();
+        List<PartitionItem> remainingPartitionItems = new ArrayList<>();
+        partitionNameToIdMap.forEach((partitionName, partitionId) -> {
+            remainingPartitionNames.add(partitionName);
+            remainingPartitionItems.add(idToPartitionItem.get(partitionId));
+        });
+        cleanPartitions();
+        addPartitionItems(remainingPartitionNames, remainingPartitionItems, 
types);
+    }
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+    public Lock readLock() {
+        return readWriteLock.readLock();
+    }
+
+    public Lock writeLock() {
+        return readWriteLock.writeLock();
+    }
+
+    private void cleanPartitions() {
+        nextPartitionId = 0;
+        idToPartitionItem.clear();
+        partitionNameToIdMap.clear();
+        partitionIdToNameMap.clear();
+
+        idToUniqueIdsMap = null;
+        partitionValuesMap = null;
+        uidToPartitionRange = null;
+        rangeToId = null;
+        singleColumnRangeMap = null;
+        singleUidToColumnRangeMap = null;
+    }
+
+    private ListPartitionItem toListPartitionItem(List<String> 
partitionValues, List<Type> types) {
+        Preconditions.checkState(partitionValues.size() == types.size());
+        try {
+            PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(
+                    partitionValues.stream().map(p -> new PartitionValue(p, 
HIVE_DEFAULT_PARTITION.equals(p)))
+                            .collect(Collectors.toList()),
+                    types);
+            return new ListPartitionItem(Lists.newArrayList(key));
+        } catch (AnalysisException e) {
+            throw new CacheException("failed to convert partition %s to list 
partition",
+                    e, partitionValues);
+        }
+    }
+
+    private List<String> getHivePartitionValues(String partitionName) {
+        // Partition name will be in format: nation=cn/city=beijing
+        // parse it to get values "cn" and "beijing"
+        return Arrays.stream(partitionName.split("/")).map(part -> {
+            String[] kv = part.split("=");
+            Preconditions.checkState(kv.length == 2, partitionName);
+            String partitionValue;
+            try {
+                // hive partition value maybe contains special characters like 
'=' and '/'
+                partitionValue = URLDecoder.decode(kv[1], 
StandardCharsets.UTF_8.name());
+            } catch (UnsupportedEncodingException e) {
+                // It should not be here
+                throw new RuntimeException(e);
+            }
+            return partitionValue;
+        }).collect(Collectors.toList());
+    }
+
+    @Data
+    public static class TablePartitionKey {
+        private String dbName;
+        private String tblName;
+        // not in key
+        private List<Type> types;
+
+        public TablePartitionKey(String dbName, String tblName, List<Type> 
types) {
+            this.dbName = dbName;
+            this.tblName = tblName;
+            this.types = types;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof TablePartitionKey)) {
+                return false;
+            }
+            return dbName.equals(((TablePartitionKey) obj).dbName)
+                    && tblName.equals(((TablePartitionKey) obj).tblName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dbName, tblName);
+        }
+
+        @Override
+        public String toString() {
+            return "TablePartitionKey{" + "dbName='" + dbName + '\'' + ", 
tblName='" + tblName + '\'' + '}';
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
new file mode 100644
index 0000000000..ab6a8839b5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.hudi;
+
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.planner.external.TablePartitionValues;
+import 
org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
+    private final long catalogId;
+    private final LoadingCache<TablePartitionKey, TablePartitionValues> 
partitionCache;
+
+    public HudiCachedPartitionProcessor(long catalogId, Executor executor) {
+        this.catalogId = catalogId;
+        this.partitionCache = 
CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num)
+                
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, 
TimeUnit.MINUTES)
+                .build(CacheLoader.asyncReloading(
+                        new CacheLoader<TablePartitionKey, 
TablePartitionValues>() {
+                            @Override
+                            public TablePartitionValues load(TablePartitionKey 
key) throws Exception {
+                                return new TablePartitionValues();
+                            }
+                        }, executor));
+    }
+
+    @Override
+    public void cleanUp() {
+        partitionCache.cleanUp();
+    }
+
+    @Override
+    public void cleanDatabasePartitions(String dbName) {
+        partitionCache.asMap().keySet().stream().filter(k -> 
k.getDbName().equals(dbName)).collect(Collectors.toList())
+                .forEach(partitionCache::invalidate);
+
+    }
+
+    @Override
+    public void cleanTablePartitions(String dbName, String tblName) {
+        partitionCache.asMap().keySet().stream()
+                .filter(k -> k.getDbName().equals(dbName) && 
k.getTblName().equals(tblName))
+                .collect(Collectors.toList())
+                .forEach(partitionCache::invalidate);
+    }
+
+    public TablePartitionValues getPartitionValues(HMSExternalTable table, 
HoodieTableMetaClient tableMetaClient)
+            throws CacheException {
+        assert (catalogId == table.getCatalog().getId());
+        Option<String[]> partitionColumns = 
tableMetaClient.getTableConfig().getPartitionFields();
+        if (!partitionColumns.isPresent()) {
+            return null;
+        }
+        HoodieTimeline timeline = 
tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+        Option<HoodieInstant> lastInstant = timeline.lastInstant();
+        if (!lastInstant.isPresent()) {
+            return null;
+        }
+        try {
+            long lastTimestamp = 
Long.parseLong(lastInstant.get().getTimestamp());
+            TablePartitionValues partitionValues = partitionCache.get(
+                    new TablePartitionKey(table.getDbName(), table.getName(), 
table.getPartitionColumnTypes()));
+            partitionValues.readLock().lock();
+            try {
+                long lastUpdateTimestamp = 
partitionValues.getLastUpdateTimestamp();
+                if (lastTimestamp == lastUpdateTimestamp) {
+                    return partitionValues;
+                }
+                assert (lastTimestamp > lastUpdateTimestamp);
+            } finally {
+                partitionValues.readLock().unlock();
+            }
+
+            partitionValues.writeLock().lock();
+            try {
+                long lastUpdateTimestamp = 
partitionValues.getLastUpdateTimestamp();
+                if (lastTimestamp == lastUpdateTimestamp) {
+                    return partitionValues;
+                }
+                assert (lastTimestamp > lastUpdateTimestamp);
+                List<String> partitionNames;
+                if (lastUpdateTimestamp == 0) {
+                    partitionNames = getAllPartitionNames(tableMetaClient);
+                } else {
+                    partitionNames = getPartitionNamesInRange(timeline, 
String.valueOf(lastUpdateTimestamp),
+                            String.valueOf(lastTimestamp));
+                }
+                List<String> partitionColumnsList = 
Arrays.asList(partitionColumns.get());
+                partitionValues.addPartitions(partitionNames,
+                        partitionNames.stream().map(p -> 
parsePartitionValues(partitionColumnsList, p))
+                                .collect(Collectors.toList()), 
table.getPartitionColumnTypes());
+                partitionValues.setLastUpdateTimestamp(lastTimestamp);
+                return partitionValues;
+            } finally {
+                partitionValues.writeLock().unlock();
+            }
+        } catch (Exception e) {
+            throw new CacheException("Failed to get hudi partitions", e);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java
new file mode 100644
index 0000000000..4956a6f58c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.hudi;
+
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.HMSExternalCatalog;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public class HudiPartitionMgr {
+    private static volatile HudiPartitionMgr partitionMgr = null;
+
+    private static final Map<Long, HudiPartitionProcessor> partitionProcessors 
= Maps.newConcurrentMap();
+    private final Executor executor;
+
+    private HudiPartitionMgr(Executor executor) {
+        this.executor = executor;
+    }
+
+    public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog 
catalog) {
+        return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId 
-> {
+            if (catalog instanceof HMSExternalCatalog) {
+                return new HudiCachedPartitionProcessor(catalogId, executor);
+            } else {
+                throw new RuntimeException("Hudi only supports hive(or 
compatible) catalog now");
+            }
+        });
+    }
+
+    public void removePartitionProcessor(long catalogId) {
+        HudiPartitionProcessor processor = 
partitionProcessors.remove(catalogId);
+        if (processor != null) {
+            processor.cleanUp();
+        }
+    }
+
+    public void cleanPartitionProcess(long catalogId) {
+        HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+        if (processor != null) {
+            processor.cleanUp();
+        }
+    }
+
+    public void cleanDatabasePartitions(long catalogId, String dbName) {
+        HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+        if (processor != null) {
+            processor.cleanDatabasePartitions(dbName);
+        }
+    }
+
+    public void cleanTablePartitions(long catalogId, String dbName, String 
tblName) {
+        HudiPartitionProcessor processor = partitionProcessors.get(catalogId);
+        if (processor != null) {
+            processor.cleanTablePartitions(dbName, tblName);
+        }
+    }
+
+    public static HudiPartitionMgr get(Executor executor) {
+        if (partitionMgr == null) {
+            synchronized (HudiPartitionMgr.class) {
+                if (partitionMgr == null) {
+                    partitionMgr = new HudiPartitionMgr(executor);
+                }
+            }
+        }
+        return partitionMgr;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
new file mode 100644
index 0000000000..3be3e1f080
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.hudi;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class HudiPartitionProcessor {
+
+    public abstract void cleanUp();
+
+    public abstract void cleanDatabasePartitions(String dbName);
+
+    public abstract void cleanTablePartitions(String dbName, String tblName);
+
+    public String[] getPartitionColumns(HoodieTableMetaClient tableMetaClient) 
{
+        return tableMetaClient.getTableConfig().getPartitionFields().get();
+    }
+
+    public List<String> getAllPartitionNames(HoodieTableMetaClient 
tableMetaClient) throws IOException {
+        TypedProperties configProperties = new TypedProperties();
+        HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+                .fromProperties(configProperties)
+                
.enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(),
+                        
HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS)
+                        && 
HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient))
+                .build();
+
+        HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create(
+                new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), 
metadataConfig,
+                tableMetaClient.getBasePathV2().toString(),
+                FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), 
true);
+
+        return 
newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList(""));
+    }
+
+    public List<String> getPartitionNamesInRange(HoodieTimeline timeline, 
String startTimestamp, String endTimestamp) {
+        return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths(
+                timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants().stream()
+                        .map(instant -> {
+                            try {
+                                return 
TimelineUtils.getCommitMetadata(instant, timeline);
+                            } catch (IOException e) {
+                                throw new RuntimeException(e.getMessage(), e);
+                            }
+                        }).collect(Collectors.toList())));
+    }
+
+    public static List<String> parsePartitionValues(List<String> 
partitionColumns, String partitionPath) {
+        if (partitionColumns.size() == 0) {
+            // This is a non-partitioned table
+            return Collections.emptyList();
+        }
+        String[] partitionFragments = partitionPath.split("/");
+        if (partitionFragments.length != partitionColumns.size()) {
+            if (partitionColumns.size() == 1) {
+                // If the partition column size is not equal to the partition 
fragment size
+                // and the partition column size is 1, we map the whole 
partition path
+                // to the partition column which can benefit from the 
partition prune.
+                String prefix = partitionColumns.get(0) + "=";
+                String partitionValue;
+                if (partitionPath.startsWith(prefix)) {
+                    // support hive style partition path
+                    partitionValue = partitionPath.substring(prefix.length());
+                } else {
+                    partitionValue = partitionPath;
+                }
+                // TODO: In hive, the specific characters like '=', '/' will 
be url encoded
+                return Collections.singletonList(partitionValue);
+            } else {
+                // If the partition column size is not equal to the partition 
fragments size
+                // and the partition column size > 1, we do not know how to 
map the partition
+                // fragments to the partition columns and therefore return an 
empty tuple. We don't
+                // fail outright so that in some cases we can fallback to 
reading the table as non-partitioned
+                // one
+                throw new RuntimeException("Failed to parse partition values 
of path: " + partitionPath);
+            }
+        } else {
+            // If partitionSeqs.length == partitionSchema.fields.length
+            // Append partition name to the partition value if the
+            // HIVE_STYLE_PARTITIONING is disable.
+            // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
+            List<String> partitionValues = new 
ArrayList<>(partitionFragments.length);
+            for (int i = 0; i < partitionFragments.length; i++) {
+                String prefix = partitionColumns.get(i) + "=";
+                if (partitionFragments[i].startsWith(prefix)) {
+                    
partitionValues.add(partitionFragments[i].substring(prefix.length()));
+                } else {
+                    partitionValues.add(partitionFragments[i]);
+                }
+            }
+            return partitionValues;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
index 3c4fb0d1fa..734b3943a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java
@@ -18,16 +18,21 @@
 package org.apache.doris.planner.external.hudi;
 
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.HudiUtils;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.FileSplit;
 import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.planner.external.TablePartitionValues;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -36,10 +41,12 @@ import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.THudiFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.google.common.collect.Lists;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
@@ -54,7 +61,10 @@ import org.apache.hudi.common.util.Option;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -137,6 +147,53 @@ public class HudiScanNode extends HiveScanNode {
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
+    private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient 
metaClient) throws AnalysisException {
+        List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes();
+        if (!partitionColumnTypes.isEmpty()) {
+            HudiCachedPartitionProcessor processor = 
(HudiCachedPartitionProcessor) Env.getCurrentEnv()
+                    
.getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog());
+            TablePartitionValues partitionValues = 
processor.getPartitionValues(hmsTable, metaClient);
+            if (partitionValues != null) {
+                // 2. prune partitions by expr
+                partitionValues.readLock().lock();
+                try {
+                    Map<Long, PartitionItem> idToPartitionItem = 
partitionValues.getIdToPartitionItem();
+                    this.totalPartitionNum = idToPartitionItem.size();
+                    ListPartitionPrunerV2 pruner = new 
ListPartitionPrunerV2(idToPartitionItem,
+                            hmsTable.getPartitionColumns(), columnNameToRange,
+                            partitionValues.getUidToPartitionRange(),
+                            partitionValues.getRangeToId(),
+                            partitionValues.getSingleColumnRangeMap(),
+                            true);
+                    Collection<Long> filteredPartitionIds = pruner.prune();
+                    this.readPartitionNum = filteredPartitionIds.size();
+                    // 3. get partitions from cache
+                    String dbName = hmsTable.getDbName();
+                    String tblName = hmsTable.getName();
+                    String inputFormat = 
hmsTable.getRemoteTable().getSd().getInputFormat();
+                    String basePath = metaClient.getBasePathV2().toString();
+                    Map<Long, String> partitionIdToNameMap = 
partitionValues.getPartitionIdToNameMap();
+                    Map<Long, List<String>> partitionValuesMap = 
partitionValues.getPartitionValuesMap();
+                    return filteredPartitionIds.stream().map(id -> {
+                        String path = basePath + "/" + 
partitionIdToNameMap.get(id);
+                        return new HivePartition(
+                                dbName, tblName, false, inputFormat, path, 
partitionValuesMap.get(id));
+                    }).collect(Collectors.toList());
+                } finally {
+                    partitionValues.readLock().unlock();
+                }
+            }
+        }
+        // unpartitioned table, create a dummy partition to save location and 
inputformat,
+        // so that we can unify the interface.
+        HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), 
hmsTable.getName(), true,
+                hmsTable.getRemoteTable().getSd().getInputFormat(),
+                hmsTable.getRemoteTable().getSd().getLocation(), null);
+        this.totalPartitionNum = 1;
+        this.readPartitionNum = 1;
+        return Lists.newArrayList(dummyPartition);
+    }
+
     @Override
     public List<Split> getSplits() throws UserException {
         HoodieTableMetaClient hudiClient = 
HiveMetaStoreClientHelper.getHudiClient(hmsTable);
@@ -173,13 +230,30 @@ public class HudiScanNode extends HiveScanNode {
         }
 
         HoodieTimeline timeline = 
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
-        Option<HoodieInstant> latestInstant = timeline.lastInstant();
-        if (!latestInstant.isPresent()) {
-            return new ArrayList<>();
+        String queryInstant;
+        if (desc.getRef().getTableSnapshot() != null) {
+            queryInstant = desc.getRef().getTableSnapshot().getTime();
+        } else {
+            Option<HoodieInstant> snapshotInstant = timeline.lastInstant();
+            if (!snapshotInstant.isPresent()) {
+                return Collections.emptyList();
+            }
+            queryInstant = snapshotInstant.get().getTimestamp();
         }
-        String queryInstant = latestInstant.get().getTimestamp();
         // Non partition table will get one dummy partition
-        List<HivePartition> partitions = getPartitions();
+        UserGroupInformation ugi = 
HiveMetaStoreClientHelper.getUserGroupInformation(
+                HiveMetaStoreClientHelper.getConfiguration(hmsTable));
+        List<HivePartition> partitions;
+        if (ugi != null) {
+            try {
+                partitions = ugi.doAs(
+                        (PrivilegedExceptionAction<List<HivePartition>>) () -> 
getPrunedPartitions(hudiClient));
+            } catch (Exception e) {
+                throw new UserException(e);
+            }
+        } else {
+            partitions = getPrunedPartitions(hudiClient);
+        }
         try {
             for (HivePartition partition : partitions) {
                 String globPath;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to