This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f4e5727c076 branch-3.0: [feat](mtmv)mtmv support paimon partition 
refresh #43959 #44911 (#45660)
f4e5727c076 is described below

commit f4e5727c0767c779a9dda0b45877de205c78d5eb
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Tue Dec 24 19:26:18 2024 +0800

    branch-3.0: [feat](mtmv)mtmv support paimon partition refresh #43959 #44911 
(#45660)
    
    pick: #44911 #43959
    
    only pick code about paimon, not pick some code about MTMV REFRESH
---
 .../apache/doris/datasource/ExternalCatalog.java   |   9 +-
 .../doris/datasource/ExternalMetaCacheMgr.java     |  12 +
 .../doris/datasource/ExternalSchemaCache.java      |   6 +-
 .../org/apache/doris/datasource/ExternalTable.java |   9 +-
 .../doris/datasource/hive/HMSExternalTable.java    |   7 +-
 .../maxcompute/MaxComputeExternalTable.java        |   2 +-
 .../MvccUtil.java}                                 |  45 ++--
 .../datasource/paimon/PaimonExternalTable.java     | 228 +++++++++--------
 .../datasource/paimon/PaimonMetadataCache.java     | 144 +++++++++++
 .../datasource/paimon/PaimonMetadataCacheMgr.java  |  49 ++++
 ...hemaCacheValue.java => PaimonMvccSnapshot.java} |  21 +-
 .../doris/datasource/paimon/PaimonPartition.java   |  61 +++++
 ...emaCacheValue.java => PaimonPartitionInfo.java} |  31 ++-
 ...imonSchemaCacheValue.java => PaimonSchema.java} |  29 ++-
 .../datasource/paimon/PaimonSchemaCacheKey.java    |  55 +++++
 .../datasource/paimon/PaimonSchemaCacheValue.java  |  12 +-
 ...onSchemaCacheValue.java => PaimonSnapshot.java} |  25 +-
 .../datasource/paimon/PaimonSnapshotCacheKey.java  |  75 ++++++
 ...cheValue.java => PaimonSnapshotCacheValue.java} |  24 +-
 .../apache/doris/datasource/paimon/PaimonUtil.java | 275 +++++++++++++++++++++
 .../datasource/paimon/source/PaimonSource.java     |   3 +-
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  14 ++
 .../org/apache/doris/nereids/StatementContext.java |  26 +-
 .../rules/rewrite/PruneFileScanPartition.java      |   9 +-
 .../plans/commands/UpdateMvByPartitionCommand.java |   7 +
 .../trees/plans/logical/LogicalFileScan.java       |   7 +-
 .../java/org/apache/doris/mtmv/PaimonUtilTest.java |  71 ++++++
 regression-test/data/mtmv_p0/test_paimon_mtmv.out  |   9 -
 .../suites/mtmv_p0/test_paimon_mtmv.groovy         |  62 -----
 29 files changed, 1038 insertions(+), 289 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index c8ca21e88ef..d1df51177fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.es.EsExternalDatabase;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -453,13 +454,13 @@ public abstract class ExternalCatalog
         }
     }
 
-    public final Optional<SchemaCacheValue> getSchema(String dbName, String 
tblName) {
+    public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
         makeSureInitialized();
-        Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
+        Optional<ExternalDatabase<? extends ExternalTable>> db = 
getDb(key.getDbName());
         if (db.isPresent()) {
-            Optional<? extends ExternalTable> table = 
db.get().getTable(tblName);
+            Optional<? extends ExternalTable> table = 
db.get().getTable(key.getTblName());
             if (table.isPresent()) {
-                return table.get().initSchemaAndUpdateTime();
+                return table.get().initSchemaAndUpdateTime(key);
             }
         }
         return Optional.empty();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index cc40ad292ce..24f55e74266 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -31,6 +31,8 @@ import 
org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
 import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
 import org.apache.doris.datasource.metacache.MetaCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
 import org.apache.doris.fs.FileSystemCache;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 
@@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr {
     private ExternalRowCountCache rowCountCache;
     private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
     private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
+    private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
 
     public ExternalMetaCacheMgr() {
         rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
@@ -122,6 +125,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
         icebergMetadataCacheMgr = new 
IcebergMetadataCacheMgr(commonRefreshExecutor);
         maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
+        paimonMetadataCacheMgr = new 
PaimonMetadataCacheMgr(commonRefreshExecutor);
     }
 
     public ExecutorService getFileListingExecutor() {
@@ -167,6 +171,10 @@ public class ExternalMetaCacheMgr {
         return icebergMetadataCacheMgr.getIcebergMetadataCache();
     }
 
+    public PaimonMetadataCache getPaimonMetadataCache() {
+        return paimonMetadataCacheMgr.getPaimonMetadataCache();
+    }
+
     public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
         return 
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
     }
@@ -189,6 +197,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.removePartitionProcessor(catalogId);
         icebergMetadataCacheMgr.removeCache(catalogId);
         maxComputeMetadataCacheMgr.removeCache(catalogId);
+        paimonMetadataCacheMgr.removeCache(catalogId);
     }
 
     public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
@@ -204,6 +213,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
         icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
         maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
+        paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, 
tblName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, 
tblName, catalogId);
         }
@@ -222,6 +232,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
         icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
         maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+        paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid db cache for {} in catalog {}", dbName, 
catalogId);
         }
@@ -239,6 +250,7 @@ public class ExternalMetaCacheMgr {
         hudiPartitionMgr.cleanPartitionProcess(catalogId);
         icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
         maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
+        paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
         if (LOG.isDebugEnabled()) {
             LOG.debug("invalid catalog cache for {}", catalogId);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index a0558766e81..de3eeff75d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -74,7 +74,7 @@ public class ExternalSchemaCache {
     }
 
     private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
-        Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, 
key.tblName);
+        Optional<SchemaCacheValue> schema = catalog.getSchema(key);
         if (LOG.isDebugEnabled()) {
             LOG.debug("load schema for {} in catalog {}", key, 
catalog.getName());
         }
@@ -83,6 +83,10 @@ public class ExternalSchemaCache {
 
     public Optional<SchemaCacheValue> getSchemaValue(String dbName, String 
tblName) {
         SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+        return getSchemaValue(key);
+    }
+
+    public Optional<SchemaCacheValue> getSchemaValue(SchemaCacheKey key) {
         return schemaCache.get(key);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index bd1e36e7bc9..91df061678f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -317,8 +318,12 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
      *
      * @return
      */
-    public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
+    public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey 
key) {
         schemaUpdateTime = System.currentTimeMillis();
+        return initSchema(key);
+    }
+
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
         return initSchema();
     }
 
@@ -399,7 +404,7 @@ public class ExternalTable implements TableIf, Writable, 
GsonPostProcessable {
      * @param snapshot if not support mvcc, ignore this
      * @return partitionName ==> PartitionItem
      */
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         return Collections.emptyMap();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index ad685386ec9..da4670d6d05 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.TablePartitionValues;
@@ -330,7 +331,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         return getNameToPartitionItems();
     }
 
@@ -501,6 +502,10 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
+    public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey 
key) {
+        return initSchemaAndUpdateTime();
+    }
+
     public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
         org.apache.hadoop.hive.metastore.api.Table table = 
((HMSExternalCatalog) catalog).getClient()
                 .getTable(dbName, name);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 0f748f59e92..dbbbcf2d6a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -92,7 +92,7 @@ public class MaxComputeExternalTable extends ExternalTable {
     }
 
     @Override
-    protected Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
         if (getPartitionColumns().isEmpty()) {
             return Collections.emptyMap();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
index aaaefe7f32d..ffdaff770e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
@@ -15,25 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
-
-import org.apache.paimon.table.Table;
-
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
-    private Table paimonTable;
-
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
-    }
-
-    public Table getPaimonTable() {
-        return paimonTable;
+package org.apache.doris.datasource.mvcc;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Optional;
+
+public class MvccUtil {
+    /**
+     * get Snapshot From StatementContext
+     *
+     * @param tableIf
+     * @return MvccSnapshot
+     */
+    public static Optional<MvccSnapshot> getSnapshotFromContext(TableIf 
tableIf) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext == null) {
+            return Optional.empty();
+        }
+        StatementContext statementContext = 
connectContext.getStatementContext();
+        if (statementContext == null) {
+            return Optional.empty();
+        }
+        return statementContext.getSnapshot(tableIf);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index c9eaf1b7df3..7b59d879d93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -18,10 +18,16 @@
 package org.apache.doris.datasource.paimon;
 
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -30,30 +36,36 @@ import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.table.system.SchemasTable;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowType;
 
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
+import java.util.Set;
 
-public class PaimonExternalTable extends ExternalTable {
+public class PaimonExternalTable extends ExternalTable implements MvccTable {
 
     private static final Logger LOG = 
LogManager.getLogger(PaimonExternalTable.class);
 
+    private final Table paimonTable;
+
     public PaimonExternalTable(long id, String name, String dbName, 
PaimonExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+        this.paimonTable = catalog.getPaimonTable(dbName, name);
     }
 
     public String getPaimonCatalogType() {
@@ -67,99 +79,27 @@ public class PaimonExternalTable extends ExternalTable {
         }
     }
 
-    public Table getPaimonTable() {
-        makeSureInitialized();
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) 
value).getPaimonTable()).orElse(null);
+    public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
+        return paimonTable.copy(
+                Collections.singletonMap(CoreOptions.SCAN_VERSION.key(),
+                        
String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId())));
     }
 
-    @Override
-    public Optional<SchemaCacheValue> initSchema() {
-        Table paimonTable = ((PaimonExternalCatalog) 
catalog).getPaimonTable(dbName, name);
-        TableSchema schema = ((FileStoreTable) paimonTable).schema();
-        List<DataField> columns = schema.fields();
-        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
-        for (DataField field : columns) {
-            tmpSchema.add(new Column(field.name().toLowerCase(),
-                    paimonTypeToDorisType(field.type()), true, null, true, 
field.description(), true,
-                    field.id()));
-        }
-        return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
-    }
-
-    private Type 
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
-        int tsScale = 3; // default
-        switch (dataType.getTypeRoot()) {
-            case BOOLEAN:
-                return Type.BOOLEAN;
-            case INTEGER:
-                return Type.INT;
-            case BIGINT:
-                return Type.BIGINT;
-            case FLOAT:
-                return Type.FLOAT;
-            case DOUBLE:
-                return Type.DOUBLE;
-            case SMALLINT:
-                return Type.SMALLINT;
-            case TINYINT:
-                return Type.TINYINT;
-            case VARCHAR:
-            case BINARY:
-            case CHAR:
-            case VARBINARY:
-                return Type.STRING;
-            case DECIMAL:
-                DecimalType decimal = (DecimalType) dataType;
-                return ScalarType.createDecimalV3Type(decimal.getPrecision(), 
decimal.getScale());
-            case DATE:
-                return ScalarType.createDateV2Type();
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                if (dataType instanceof org.apache.paimon.types.TimestampType) 
{
-                    tsScale = ((org.apache.paimon.types.TimestampType) 
dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                } else if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
-                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                }
-                return ScalarType.createDatetimeV2Type(tsScale);
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
-                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
-                    if (tsScale > 6) {
-                        tsScale = 6;
-                    }
-                }
-                return ScalarType.createDatetimeV2Type(tsScale);
-            case ARRAY:
-                ArrayType arrayType = (ArrayType) dataType;
-                Type innerType = 
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
-                return org.apache.doris.catalog.ArrayType.create(innerType, 
true);
-            case MAP:
-                MapType mapType = (MapType) dataType;
-                return new org.apache.doris.catalog.MapType(
-                        paimonTypeToDorisType(mapType.getKeyType()), 
paimonTypeToDorisType(mapType.getValueType()));
-            case ROW:
-                RowType rowType = (RowType) dataType;
-                List<DataField> fields = rowType.getFields();
-                return new org.apache.doris.catalog.StructType(fields.stream()
-                        .map(field -> new 
org.apache.doris.catalog.StructField(field.name(),
-                                paimonTypeToDorisType(field.type())))
-                        .collect(Collectors.toCollection(ArrayList::new)));
-            case TIME_WITHOUT_TIME_ZONE:
-                return Type.UNSUPPORTED;
-            default:
-                LOG.warn("Cannot transform unknown type: " + 
dataType.getTypeRoot());
-                return Type.UNSUPPORTED;
+    public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+                new PaimonSchemaCacheKey(dbName, name, schemaId));
+        if (!schemaCacheValue.isPresent()) {
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                    null, catalog.getName(), dbName, name, schemaId);
         }
+        return (PaimonSchemaCacheValue) schemaCacheValue.get();
     }
 
-    protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType 
type) {
-        return paimonPrimitiveTypeToDorisType(type);
+    private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() {
+        makeSureInitialized();
+        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
+                .getPaimonSnapshot(catalog, dbName, name);
     }
 
     @Override
@@ -189,13 +129,6 @@ public class PaimonExternalTable extends ExternalTable {
     public long fetchRowCount() {
         makeSureInitialized();
         long rowCount = 0;
-        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
-        Table paimonTable = schemaCacheValue.map(value -> 
((PaimonSchemaCacheValue) value).getPaimonTable())
-                .orElse(null);
-        if (paimonTable == null) {
-            LOG.info("Paimon table {} is null.", name);
-            return UNKNOWN_ROW_COUNT;
-        }
         List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
         for (Split split : splits) {
             rowCount += split.rowCount();
@@ -205,4 +138,87 @@ public class PaimonExternalTable extends ExternalTable {
         }
         return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
     }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPaimonSchemaCacheValue(snapshot).getPartitionColumns();
+    }
+
+    @Override
+    public MvccSnapshot loadSnapshot() {
+        return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+    }
+
+    @Override
+    public boolean supportInternalPartitionPruned() {
+        return true;
+    }
+
+    @Override
+    public List<Column> getFullSchema() {
+        return 
getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema();
+    }
+
+    @Override
+    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+        makeSureInitialized();
+        PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
+        try {
+            PaimonSchema schema = 
loadPaimonSchemaBySchemaId(paimonSchemaCacheKey);
+            List<DataField> columns = schema.getFields();
+            List<Column> dorisColumns = 
Lists.newArrayListWithCapacity(columns.size());
+            Set<String> partitionColumnNames = 
Sets.newHashSet(schema.getPartitionKeys());
+            List<Column> partitionColumns = Lists.newArrayList();
+            for (DataField field : columns) {
+                Column column = new Column(field.name().toLowerCase(),
+                        PaimonUtil.paimonTypeToDorisType(field.type()), true, 
null, true, field.description(), true,
+                        field.id());
+                dorisColumns.add(column);
+                if (partitionColumnNames.contains(field.name())) {
+                    partitionColumns.add(column);
+                }
+            }
+            return Optional.of(new PaimonSchemaCacheValue(dorisColumns, 
partitionColumns));
+        } catch (Exception e) {
+            throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+                    null, getCatalog().getName(), key.getDbName(), 
key.getTblName(),
+                    paimonSchemaCacheKey.getSchemaId());
+        }
+    }
+
+    private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) 
throws IOException {
+        Table table = ((PaimonExternalCatalog) 
getCatalog()).getPaimonTable(key.getDbName(),
+                name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS);
+        PredicateBuilder builder = new PredicateBuilder(table.rowType());
+        Predicate predicate = builder.equal(0, key.getSchemaId());
+        // Adding predicates will also return excess data
+        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, 
{2}}, predicate);
+        for (InternalRow row : rows) {
+            PaimonSchema schema = PaimonUtil.rowToSchema(row);
+            if (schema.getSchemaId() == key.getSchemaId()) {
+                return schema;
+            }
+        }
+        throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+                null, getCatalog().getName(), key.getDbName(), 
key.getTblName(), key.getSchemaId());
+    }
+
+    private PaimonSchemaCacheValue 
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
+        PaimonSnapshotCacheValue snapshotCacheValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        return 
getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId());
+    }
+
+    private PaimonSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+        if (snapshot.isPresent()) {
+            return ((PaimonMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        } else {
+            return getPaimonSnapshotCacheValue();
+        }
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
new file mode 100644
index 00000000000..5b711e07066
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.PartitionsTable;
+import org.apache.paimon.table.system.SnapshotsTable;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCache {
+
+    private final LoadingCache<PaimonSnapshotCacheKey, 
PaimonSnapshotCacheValue> snapshotCache;
+
+    public PaimonMetadataCache(ExecutorService executor) {
+        CacheFactory snapshotCacheFactory = new CacheFactory(
+                OptionalLong.of(28800L),
+                
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+                Config.max_external_table_cache_num,
+                true,
+                null);
+        this.snapshotCache = snapshotCacheFactory.buildCache(key -> 
loadSnapshot(key), null, executor);
+    }
+
+    @NotNull
+    private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) {
+        try {
+            PaimonSnapshot latestSnapshot = loadLatestSnapshot(key);
+            PaimonExternalTable table = (PaimonExternalTable) 
key.getCatalog().getDbOrAnalysisException(key.getDbName())
+                    .getTableOrAnalysisException(key.getTableName());
+            List<Column> partitionColumns = 
table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId())
+                    .getPartitionColumns();
+            PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, 
partitionColumns);
+            return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot);
+        } catch (IOException | AnalysisException e) {
+            throw new CacheException("failed to loadSnapshot for: %s.%s.%s",
+                    e, key.getCatalog().getName(), key.getDbName(), 
key.getTableName());
+        }
+    }
+
+    private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, 
List<Column> partitionColumns)
+            throws IOException, AnalysisException {
+        if (CollectionUtils.isEmpty(partitionColumns)) {
+            return new PaimonPartitionInfo();
+        }
+        List<PaimonPartition> paimonPartitions = loadPartitions(key);
+        return PaimonUtil.generatePartitionInfo(partitionColumns, 
paimonPartitions);
+    }
+
+    private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key)
+            throws IOException {
+        Table table = ((PaimonExternalCatalog) 
key.getCatalog()).getPaimonTable(key.getDbName(),
+                key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
+        List<InternalRow> rows = PaimonUtil.read(table, null, null);
+        List<PaimonPartition> res = 
Lists.newArrayListWithCapacity(rows.size());
+        for (InternalRow row : rows) {
+            res.add(PaimonUtil.rowToPartition(row));
+        }
+        return res;
+    }
+
+    private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) 
throws IOException {
+        Table table = ((PaimonExternalCatalog) 
key.getCatalog()).getPaimonTable(key.getDbName(),
+                key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + 
SnapshotsTable.SNAPSHOTS);
+        // snapshotId and schemaId
+        List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, 
{1}}, null);
+        long latestSnapshotId = 0L;
+        long latestSchemaId = 0L;
+        for (InternalRow row : rows) {
+            long snapshotId = row.getLong(0);
+            if (snapshotId > latestSnapshotId) {
+                latestSnapshotId = snapshotId;
+                latestSchemaId = row.getLong(1);
+            }
+        }
+        return new PaimonSnapshot(latestSnapshotId, latestSchemaId);
+    }
+
+    public void invalidateCatalogCache(long catalogId) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId)
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId && 
key.getDbName().equals(dbName)
+                        && key.getTableName().equals(
+                        tblName))
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public void invalidateDbCache(long catalogId, String dbName) {
+        snapshotCache.asMap().keySet().stream()
+                .filter(key -> key.getCatalog().getId() == catalogId && 
key.getDbName().equals(dbName))
+                .forEach(snapshotCache::invalidate);
+    }
+
+    public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, 
String dbName, String tbName) {
+        PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, 
dbName, tbName);
+        return snapshotCache.get(key);
+    }
+
+    public Map<String, Map<String, String>> getCacheStats() {
+        Map<String, Map<String, String>> res = Maps.newHashMap();
+        res.put("paimon_snapshot_cache", 
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
+                snapshotCache.estimatedSize()));
+        return res;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
new file mode 100644
index 00000000000..a282fde665b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCacheMgr {
+
+    private PaimonMetadataCache paimonMetadataCache;
+
+    public PaimonMetadataCacheMgr(ExecutorService executor) {
+        this.paimonMetadataCache = new PaimonMetadataCache(executor);
+    }
+
+    public PaimonMetadataCache getPaimonMetadataCache() {
+        return paimonMetadataCache;
+    }
+
+    public void removeCache(long catalogId) {
+        paimonMetadataCache.invalidateCatalogCache(catalogId);
+    }
+
+    public void invalidateCatalogCache(long catalogId) {
+        paimonMetadataCache.invalidateCatalogCache(catalogId);
+    }
+
+    public void invalidateTableCache(long catalogId, String dbName, String 
tblName) {
+        paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName);
+    }
+
+    public void invalidateDbCache(long catalogId, String dbName) {
+        paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
similarity index 65%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
index aaaefe7f32d..2307e91adb3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
@@ -17,23 +17,16 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
 
-import org.apache.paimon.table.Table;
+public class PaimonMvccSnapshot implements MvccSnapshot {
+    private final PaimonSnapshotCacheValue snapshotCacheValue;
 
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
-    private Table paimonTable;
-
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
+    public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) {
+        this.snapshotCacheValue = snapshotCacheValue;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public PaimonSnapshotCacheValue getSnapshotCacheValue() {
+        return snapshotCacheValue;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
new file mode 100644
index 00000000000..545448199b3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+// 
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
+public class PaimonPartition {
+    // Partition values, for example: [1, dd]
+    private final String partitionValues;
+    // The amount of data in the partition
+    private final long recordCount;
+    // Partition file size
+    private final long fileSizeInBytes;
+    // Number of partition files
+    private final long fileCount;
+    // Last update time of partition
+    private final long lastUpdateTime;
+
+    public PaimonPartition(String partitionValues, long recordCount, long 
fileSizeInBytes, long fileCount,
+            long lastUpdateTime) {
+        this.partitionValues = partitionValues;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+        this.fileCount = fileCount;
+        this.lastUpdateTime = lastUpdateTime;
+    }
+
+    public String getPartitionValues() {
+        return partitionValues;
+    }
+
+    public long getRecordCount() {
+        return recordCount;
+    }
+
+    public long getFileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    public long getFileCount() {
+        return fileCount;
+    }
+
+    public long getLastUpdateTime() {
+        return lastUpdateTime;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
index aaaefe7f32d..4d3326f8e48 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
@@ -17,23 +17,32 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.catalog.PartitionItem;
 
-import org.apache.paimon.table.Table;
+import com.google.common.collect.Maps;
 
-import java.util.List;
+import java.util.Map;
 
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
+public class PaimonPartitionInfo {
+    private final Map<String, PartitionItem> nameToPartitionItem;
+    private final Map<String, PaimonPartition> nameToPartition;
 
-    private Table paimonTable;
+    public PaimonPartitionInfo() {
+        this.nameToPartitionItem = Maps.newHashMap();
+        this.nameToPartition = Maps.newHashMap();
+    }
+
+    public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
+            Map<String, PaimonPartition> nameToPartition) {
+        this.nameToPartitionItem = nameToPartitionItem;
+        this.nameToPartition = nameToPartition;
+    }
 
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
+    public Map<String, PartitionItem> getNameToPartitionItem() {
+        return nameToPartitionItem;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public Map<String, PaimonPartition> getNameToPartition() {
+        return nameToPartition;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
similarity index 59%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
index aaaefe7f32d..ef26e1ed208 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
@@ -17,23 +17,30 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
-
-import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
 
 import java.util.List;
 
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
+public class PaimonSchema {
+    private final long schemaId;
+    private final List<DataField> fields;
+    private final List<String> partitionKeys;
+
+    public PaimonSchema(long schemaId, List<DataField> fields, List<String> 
partitionKeys) {
+        this.schemaId = schemaId;
+        this.fields = fields;
+        this.partitionKeys = partitionKeys;
+    }
 
-    private Table paimonTable;
+    public long getSchemaId() {
+        return schemaId;
+    }
 
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
+    public List<DataField> getFields() {
+        return fields;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public List<String> getPartitionKeys() {
+        return partitionKeys;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
new file mode 100644
index 00000000000..f74555b369b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+public class PaimonSchemaCacheKey extends SchemaCacheKey {
+    private final long schemaId;
+
+    public PaimonSchemaCacheKey(String dbName, String tableName, long 
schemaId) {
+        super(dbName, tableName);
+        this.schemaId = schemaId;
+    }
+
+    public long getSchemaId() {
+        return schemaId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PaimonSchemaCacheKey)) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o;
+        return schemaId == that.schemaId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(super.hashCode(), schemaId);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
index aaaefe7f32d..ccb530a3cbc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
@@ -20,20 +20,18 @@ package org.apache.doris.datasource.paimon;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.datasource.SchemaCacheValue;
 
-import org.apache.paimon.table.Table;
-
 import java.util.List;
 
 public class PaimonSchemaCacheValue extends SchemaCacheValue {
 
-    private Table paimonTable;
+    private List<Column> partitionColumns;
 
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
+    public PaimonSchemaCacheValue(List<Column> schema, List<Column> 
partitionColumns) {
         super(schema);
-        this.paimonTable = paimonTable;
+        this.partitionColumns = partitionColumns;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public List<Column> getPartitionColumns() {
+        return partitionColumns;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
similarity index 65%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
index aaaefe7f32d..4a536dd72cc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
@@ -17,23 +17,20 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class PaimonSnapshot {
+    private final long snapshotId;
+    private final long schemaId;
 
-import org.apache.paimon.table.Table;
-
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
-    private Table paimonTable;
+    public PaimonSnapshot(long snapshotId, long schemaId) {
+        this.snapshotId = snapshotId;
+        this.schemaId = schemaId;
+    }
 
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
+    public long getSnapshotId() {
+        return snapshotId;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public long getSchemaId() {
+        return schemaId;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
new file mode 100644
index 00000000000..970f111a721
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogIf;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+public class PaimonSnapshotCacheKey {
+    private final CatalogIf catalog;
+    private final String dbName;
+    private final String tableName;
+
+    public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String 
tableName) {
+        this.catalog = catalog;
+        this.dbName = dbName;
+        this.tableName = tableName;
+    }
+
+    public CatalogIf getCatalog() {
+        return catalog;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o;
+        return catalog.getId() == that.catalog.getId()
+                && Objects.equals(dbName, that.dbName)
+                && Objects.equals(tableName, that.tableName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(catalog.getId(), dbName, tableName);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", 
PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]")
+                .add("catalog=" + catalog)
+                .add("dbName='" + dbName + "'")
+                .add("tableName='" + tableName + "'")
+                .toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
similarity index 64%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
index aaaefe7f32d..c50ecdabfde 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
@@ -17,23 +17,21 @@
 
 package org.apache.doris.datasource.paimon;
 
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class PaimonSnapshotCacheValue {
 
-import org.apache.paimon.table.Table;
+    private final PaimonPartitionInfo partitionInfo;
+    private final PaimonSnapshot snapshot;
 
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
-    private Table paimonTable;
+    public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, 
PaimonSnapshot snapshot) {
+        this.partitionInfo = partitionInfo;
+        this.snapshot = snapshot;
+    }
 
-    public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
-        super(schema);
-        this.paimonTable = paimonTable;
+    public PaimonPartitionInfo getPartitionInfo() {
+        return partitionInfo;
     }
 
-    public Table getPaimonTable() {
-        return paimonTable;
+    public PaimonSnapshot getSnapshot() {
+        return snapshot;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
new file mode 100644
index 00000000000..1f7576dca51
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.hive.HiveUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+public class PaimonUtil {
+    private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
+
+    public static List<InternalRow> read(
+            Table table, @Nullable int[][] projection, @Nullable Predicate 
predicate,
+            Pair<ConfigOption<?>, String>... dynamicOptions)
+            throws IOException {
+        Map<String, String> options = new HashMap<>();
+        for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+            options.put(pair.getKey().key(), pair.getValue());
+        }
+        table = table.copy(options);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        if (projection != null) {
+            readBuilder.withProjection(projection);
+        }
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer =
+                new InternalRowSerializer(
+                        projection == null
+                                ? table.rowType()
+                                : 
Projection.of(projection).project(table.rowType()));
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+        return rows;
+    }
+
+
+    /*
+    
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
+    
+---------------+----------------+--------------------+--------------------+------------------------+
+    |  partition    |   record_count |  file_size_in_bytes|          
file_count|        last_update_time|
+    
+---------------+----------------+--------------------+--------------------+------------------------+
+    |  [1]          |           1    |             645    |                1   
| 2024-06-24 10:25:57.400|
+    
+---------------+----------------+--------------------+--------------------+------------------------+
+    org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE
+    public static final RowType TABLE_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new DataField(0, "partition", 
SerializationUtils.newStringType(true)),
+                            new DataField(1, "record_count", new 
BigIntType(false)),
+                            new DataField(2, "file_size_in_bytes", new 
BigIntType(false)),
+                            new DataField(3, "file_count", new 
BigIntType(false)),
+                            new DataField(4, "last_update_time", 
DataTypes.TIMESTAMP_MILLIS())));
+    */
+    public static PaimonPartition rowToPartition(InternalRow row) {
+        String partition = row.getString(0).toString();
+        long recordCount = row.getLong(1);
+        long fileSizeInBytes = row.getLong(2);
+        long fileCount = row.getLong(3);
+        long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond();
+        return new PaimonPartition(partition, recordCount, fileSizeInBytes, 
fileCount, lastUpdateTime);
+    }
+
+    public static PaimonPartitionInfo generatePartitionInfo(List<Column> 
partitionColumns,
+            List<PaimonPartition> paimonPartitions) throws AnalysisException {
+        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+        Map<String, PaimonPartition> nameToPartition = Maps.newHashMap();
+        PaimonPartitionInfo partitionInfo = new 
PaimonPartitionInfo(nameToPartitionItem, nameToPartition);
+        if (CollectionUtils.isEmpty(partitionColumns)) {
+            return partitionInfo;
+        }
+        for (PaimonPartition paimonPartition : paimonPartitions) {
+            String partitionName = getPartitionName(partitionColumns, 
paimonPartition.getPartitionValues());
+            nameToPartition.put(partitionName, paimonPartition);
+            nameToPartitionItem.put(partitionName, 
toListPartitionItem(partitionName, partitionColumns));
+        }
+        return partitionInfo;
+    }
+
+    private static String getPartitionName(List<Column> partitionColumns, 
String partitionValueStr) {
+        Preconditions.checkNotNull(partitionValueStr);
+        String[] partitionValues = partitionValueStr.replace("[", 
"").replace("]", "")
+                .split(",");
+        Preconditions.checkState(partitionColumns.size() == 
partitionValues.length);
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < partitionColumns.size(); ++i) {
+            if (i != 0) {
+                sb.append("/");
+            }
+            
sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]);
+        }
+        return sb.toString();
+    }
+
+    public static ListPartitionItem toListPartitionItem(String partitionName, 
List<Column> partitionColumns)
+            throws AnalysisException {
+        List<Type> types = partitionColumns.stream()
+                .map(Column::getType)
+                .collect(Collectors.toList());
+        // Partition name will be in format: nation=cn/city=beijing
+        // parse it to get values "cn" and "beijing"
+        List<String> partitionValues = 
HiveUtil.toPartitionValues(partitionName);
+        Preconditions.checkState(partitionValues.size() == types.size(), 
partitionName + " vs. " + types);
+        List<PartitionValue> values = 
Lists.newArrayListWithExpectedSize(types.size());
+        for (String partitionValue : partitionValues) {
+            // null  will in partition 'null'
+            // "null" will in partition 'null'
+            // NULL  will in partition 'null'
+            // "NULL" will in partition 'NULL'
+            // values.add(new PartitionValue(partitionValue, 
"null".equals(partitionValue)));
+            values.add(new PartitionValue(partitionValue, false));
+        }
+        PartitionKey key = 
PartitionKey.createListPartitionKeyWithTypes(values, types, true);
+        ListPartitionItem listPartitionItem = new 
ListPartitionItem(Lists.newArrayList(key));
+        return listPartitionItem;
+    }
+
+    private static Type 
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+        int tsScale = 3; // default
+        switch (dataType.getTypeRoot()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case BIGINT:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case SMALLINT:
+                return Type.SMALLINT;
+            case TINYINT:
+                return Type.TINYINT;
+            case VARCHAR:
+            case BINARY:
+            case CHAR:
+            case VARBINARY:
+                return Type.STRING;
+            case DECIMAL:
+                DecimalType decimal = (DecimalType) dataType;
+                return ScalarType.createDecimalV3Type(decimal.getPrecision(), 
decimal.getScale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                if (dataType instanceof org.apache.paimon.types.TimestampType) 
{
+                    tsScale = ((org.apache.paimon.types.TimestampType) 
dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                } else if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
+                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                }
+                return ScalarType.createDatetimeV2Type(tsScale);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (dataType instanceof 
org.apache.paimon.types.LocalZonedTimestampType) {
+                    tsScale = 
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+                    if (tsScale > 6) {
+                        tsScale = 6;
+                    }
+                }
+                return ScalarType.createDatetimeV2Type(tsScale);
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) dataType;
+                Type innerType = 
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
+                return org.apache.doris.catalog.ArrayType.create(innerType, 
true);
+            case MAP:
+                MapType mapType = (MapType) dataType;
+                return new org.apache.doris.catalog.MapType(
+                        paimonTypeToDorisType(mapType.getKeyType()), 
paimonTypeToDorisType(mapType.getValueType()));
+            case ROW:
+                RowType rowType = (RowType) dataType;
+                List<DataField> fields = rowType.getFields();
+                return new org.apache.doris.catalog.StructType(fields.stream()
+                        .map(field -> new 
org.apache.doris.catalog.StructField(field.name(),
+                                paimonTypeToDorisType(field.type())))
+                        .collect(Collectors.toCollection(ArrayList::new)));
+            case TIME_WITHOUT_TIME_ZONE:
+                return Type.UNSUPPORTED;
+            default:
+                LOG.warn("Cannot transform unknown type: " + 
dataType.getTypeRoot());
+                return Type.UNSUPPORTED;
+        }
+    }
+
+    public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType 
type) {
+        return paimonPrimitiveTypeToDorisType(type);
+    }
+
+    /**
+     * 
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table
+     * demo:
+     * 0
+     * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"},
+     * {"id":1,"name":"item_id","type":"BIGINT"},
+     * {"id":2,"name":"behavior","type":"STRING"},
+     * {"id":3,"name":"dt","type":"STRING NOT NULL"},
+     * {"id":4,"name":"hh","type":"STRING NOT NULL"}]
+     * ["dt"]
+     * ["dt","hh","user_id"]
+     * {"owner":"hadoop","provider":"paimon"}
+     * 2024-12-03 15:38:14.734
+     *
+     * @param row
+     * @return
+     */
+    public static PaimonSchema rowToSchema(InternalRow row) {
+        long schemaId = row.getLong(0);
+        String fieldsStr = row.getString(1).toString();
+        String partitionKeysStr = row.getString(2).toString();
+        List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new 
TypeReference<List<DataField>>() {
+        });
+        List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, 
new TypeReference<List<String>>() {
+        });
+        return new PaimonSchema(schemaId, fields, partitionKeys);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 885eba06ed9..a8bb814f1d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.property.constants.PaimonProperties;
 import org.apache.doris.thrift.TFileAttributes;
@@ -36,7 +37,7 @@ public class PaimonSource {
     public PaimonSource(TupleDescriptor desc) {
         this.desc = desc;
         this.paimonExtTable = (PaimonExternalTable) desc.getTable();
-        this.originTable = paimonExtTable.getPaimonTable();
+        this.originTable = 
paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable));
     }
 
     public TupleDescriptor getDesc() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 9d77ac6a6e6..42eee425269 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -30,6 +30,9 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccTableInfo;
 import org.apache.doris.job.common.TaskStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.task.AbstractTask;
@@ -72,6 +75,7 @@ import java.math.RoundingMode;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -143,6 +147,8 @@ public class MTMVTask extends AbstractTask {
     private StmtExecutor executor;
     private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
 
+    private final Map<MvccTableInfo, MvccSnapshot> snapshots = 
Maps.newHashMap();
+
     public MTMVTask() {
     }
 
@@ -231,6 +237,9 @@ public class MTMVTask extends AbstractTask {
             throws Exception {
         ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
         StatementContext statementContext = new StatementContext();
+        for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
+            statementContext.setSnapshot(entry.getKey(), entry.getValue());
+        }
         ctx.setStatementContext(statementContext);
         TUniqueId queryId = generateQueryId();
         lastQueryId = DebugUtil.printId(queryId);
@@ -318,6 +327,11 @@ public class MTMVTask extends AbstractTask {
                 MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf;
                 baseTableIf.beforeMTMVRefresh(mtmv);
             }
+            if (tableIf instanceof MvccTable) {
+                MvccTable mvccTable = (MvccTable) tableIf;
+                MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot();
+                snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index d76451d9637..4cc4a6c8600 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -622,7 +622,11 @@ public class StatementContext implements Closeable {
     public void loadSnapshots() {
         for (TableIf tableIf : tables.values()) {
             if (tableIf instanceof MvccTable) {
-                snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) 
tableIf).loadSnapshot());
+                MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+                // may be set by MTMV, we can not load again
+                if (!snapshots.containsKey(mvccTableInfo)) {
+                    snapshots.put(mvccTableInfo, ((MvccTable) 
tableIf).loadSnapshot());
+                }
             }
         }
     }
@@ -630,11 +634,25 @@ public class StatementContext implements Closeable {
     /**
      * Obtain snapshot information of mvcc
      *
-     * @param mvccTable mvccTable
+     * @param tableIf tableIf
      * @return MvccSnapshot
      */
-    public MvccSnapshot getSnapshot(MvccTable mvccTable) {
-        return snapshots.get(new MvccTableInfo(mvccTable));
+    public Optional<MvccSnapshot> getSnapshot(TableIf tableIf) {
+        if (!(tableIf instanceof MvccTable)) {
+            return Optional.empty();
+        }
+        MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+        return Optional.ofNullable(snapshots.get(mvccTableInfo));
+    }
+
+    /**
+     * Obtain snapshot information of mvcc
+     *
+     * @param mvccTableInfo mvccTableInfo
+     * @param snapshot snapshot
+     */
+    public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot 
snapshot) {
+        snapshots.put(mvccTableInfo, snapshot);
     }
 
     private static class CloseableResource implements Closeable {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
index ba8b270d1f3..e99906f5e13 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
@@ -36,7 +36,6 @@ import org.apache.commons.collections.CollectionUtils;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -74,8 +73,8 @@ public class PruneFileScanPartition extends 
OneRewriteRuleFactory {
     private SelectedPartitions pruneExternalPartitions(ExternalTable 
externalTable,
             LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, 
CascadesContext ctx) {
         Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap();
-        // todo: real snapshotId
-        if 
(CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) {
+        if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(
+                ctx.getStatementContext().getSnapshot(externalTable)))) {
             // non partitioned table, return NOT_PRUNED.
             // non partition table will be handled in HiveScanNode.
             return SelectedPartitions.NOT_PRUNED;
@@ -83,8 +82,8 @@ public class PruneFileScanPartition extends 
OneRewriteRuleFactory {
         Map<String, Slot> scanOutput = scan.getOutput()
                 .stream()
                 .collect(Collectors.toMap(slot -> 
slot.getName().toLowerCase(), Function.identity()));
-        // todo: real snapshotId
-        List<Slot> partitionSlots = 
externalTable.getPartitionColumns(Optional.empty())
+        List<Slot> partitionSlots = externalTable.getPartitionColumns(
+                        ctx.getStatementContext().getSnapshot(externalTable))
                 .stream()
                 .map(column -> scanOutput.get(column.getName().toLowerCase()))
                 .collect(Collectors.toList());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index 36cc0f95a77..b0a95ffdd3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -28,6 +28,8 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
@@ -316,6 +318,11 @@ public class UpdateMvByPartitionCommand extends 
InsertOverwriteTableCommand {
                             partitionHasDataItems.add(
                                     ((OlapTable) 
targetTable).getPartitionInfo().getItem(partition.getId()));
                         }
+                        if (targetTable instanceof ExternalTable) {
+                            // Add filter only when partition has data when 
external table
+                            partitionHasDataItems.add(((ExternalTable) 
targetTable).getNameToPartitionItems(
+                                    
MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName));
+                        }
                     }
                     if (partitionHasDataItems.isEmpty()) {
                         predicates.setNeedAddFilter(false);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index 96b8e032d11..1f5f71f7baf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.logical;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.trees.TableSample;
@@ -60,10 +61,10 @@ public class LogicalFileScan extends LogicalCatalogRelation 
{
     }
 
     public LogicalFileScan(RelationId id, ExternalTable table, List<String> 
qualifier,
-                           Optional<TableSample> tableSample, 
Optional<TableSnapshot> tableSnapshot) {
-        // todo: real snapshotId
+            Optional<TableSample> tableSample, Optional<TableSnapshot> 
tableSnapshot) {
         this(id, table, qualifier, Optional.empty(), Optional.empty(),
-                table.initSelectedPartitions(Optional.empty()), tableSample, 
tableSnapshot);
+                
table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)),
+                tableSample, tableSnapshot);
     }
 
     public SelectedPartitions getSelectedPartitions() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
new file mode 100644
index 00000000000..789af7bf835
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.paimon.PaimonPartition;
+import org.apache.doris.datasource.paimon.PaimonPartitionInfo;
+import org.apache.doris.datasource.paimon.PaimonUtil;
+
+import com.google.common.collect.Lists;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class PaimonUtilTest {
+
+    @Test
+    public void testGeneratePartitionInfo() throws AnalysisException {
+        Column k1 = new Column("k1", PrimitiveType.INT);
+        Column k2 = new Column("k2", PrimitiveType.VARCHAR);
+        List<Column> partitionColumns = Lists.newArrayList(k1, k2);
+        PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5);
+        List<PaimonPartition> paimonPartitions = Lists.newArrayList(p1);
+        PaimonPartitionInfo partitionInfo = 
PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
+        String expectPartitionName = "k1=1/k2=aa";
+        
Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName));
+        PartitionItem partitionItem = 
partitionInfo.getNameToPartitionItem().get(expectPartitionName);
+        List<PartitionKey> keys = partitionItem.getItems();
+        Assert.assertEquals(1, keys.size());
+        PartitionKey partitionKey = keys.get(0);
+        List<LiteralExpr> exprs = partitionKey.getKeys();
+        Assert.assertEquals(2, exprs.size());
+        Assert.assertEquals(1, exprs.get(0).getLongValue());
+        Assert.assertEquals("aa", exprs.get(1).getStringValue());
+    }
+
+    @Test
+    public void testRowToPartition() {
+        GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 
3L, 4L, Timestamp.fromEpochMillis(5L));
+        PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row);
+        Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues());
+        Assert.assertEquals(2L, paimonPartition.getRecordCount());
+        Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes());
+        Assert.assertEquals(4L, paimonPartition.getFileCount());
+        Assert.assertEquals(5L, paimonPartition.getLastUpdateTime());
+    }
+}
diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out 
b/regression-test/data/mtmv_p0/test_paimon_mtmv.out
deleted file mode 100644
index c654cb01214..00000000000
--- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out
+++ /dev/null
@@ -1,9 +0,0 @@
--- This file is automatically generated. You should know what you did if you 
want to edit this
--- !catalog --
-1      2       3       4       5       6       7       8       9.1     10.1    
11.10   2020-02-02      13str   14varchar       a       true    aaaa    
2023-08-13T09:32:38.530
-10     20      30      40      50      60      70      80      90.1    100.1   
110.10  2020-03-02      130str  140varchar      b       false   bbbb    
2023-08-14T08:32:52.821
-
--- !mtmv --
-1      2       3       4       5       6       7       8       9.1     10.1    
11.10   2020-02-02      13str   14varchar       a       true    aaaa    
2023-08-13T09:32:38.530
-10     20      30      40      50      60      70      80      90.1    100.1   
110.10  2020-03-02      130str  140varchar      b       false   bbbb    
2023-08-14T08:32:52.821
-
diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy
deleted file mode 100644
index e84eb497b2c..00000000000
--- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("test_paimon_mtmv", 
"p0,external,paimon,external_docker,external_docker_hive") {
-    String enabled = context.config.otherConfigs.get("enablePaimonTest")
-    logger.info("enabled: " + enabled)
-    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
-    logger.info("externalEnvIp: " + externalEnvIp)
-    String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
-    logger.info("hdfs_port: " + hdfs_port)
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        String catalog_name = "paimon_mtmv_catalog";
-        String mvName = "test_paimon_mtmv"
-        String dbName = "regression_test_mtmv_p0"
-        String paimonDb = "db1"
-        String paimonTable = "all_table"
-        sql """drop catalog if exists ${catalog_name} """
-
-        sql """create catalog if not exists ${catalog_name} properties (
-            "type" = "paimon",
-            "paimon.catalog.type"="filesystem",
-            "warehouse" = 
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
-        );"""
-
-        order_qt_catalog """select * from 
${catalog_name}.${paimonDb}.${paimonTable}"""
-        sql """drop materialized view if exists ${mvName};"""
-
-        sql """
-            CREATE MATERIALIZED VIEW ${mvName}
-                BUILD DEFERRED REFRESH AUTO ON MANUAL
-                DISTRIBUTED BY RANDOM BUCKETS 2
-                PROPERTIES ('replication_num' = '1')
-                AS
-                SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable};
-            """
-
-        sql """
-                REFRESH MATERIALIZED VIEW ${mvName} complete
-            """
-        def jobName = getJobName(dbName, mvName);
-        waitingMTMVTaskFinished(jobName)
-        order_qt_mtmv "SELECT * FROM ${mvName}"
-
-        sql """drop materialized view if exists ${mvName};"""
-        sql """ drop catalog if exists ${catalog_name} """
-    }
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to