This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 02306b475bb [fix](iceberg)Use the correct schema for query (#50376)
02306b475bb is described below

commit 02306b475bb82df7cc496e9bb7d71fd1ccfad635
Author: wuwenchi <wuwen...@selectdb.com>
AuthorDate: Tue Apr 29 13:42:14 2025 +0800

    [fix](iceberg)Use the correct schema for query (#50376)
    
    ### What problem does this PR solve?
    
    Followup #49956
    
    Problem Summary:
    
    When a snapshot is specified in the query, the corresponding schema
    should be used for parsing, otherwise the latest snapshot should be used
    for parsing.
    
    1. When using the HMS type, you also need to initialize the executor
    pool.
    2. Set the size of the thread pool to be equal to the number of cores of
    the current machine.
    3. When no snapshot is specified, the latest schema is used.
    4. When specifying a snapshot, you need to use the schema corresponding
    to the snapshot.
    5. When generating a scannode, save the schema information and no longer
    obtain it from the cache to prevent the cache from being refreshed.
    6. When refreshing the schema, you need to refresh all schemas of
    related tables.
---
 .../create_preinstalled_scripts/iceberg/run09.sql  |  37 ++
 .../java/org/apache/doris/catalog/TableIf.java     |  11 -
 .../apache/doris/datasource/ExternalCatalog.java   |   2 +
 .../doris/datasource/ExternalSchemaCache.java      |   8 +-
 .../apache/doris/datasource/FileQueryScanNode.java |   9 +-
 .../org/apache/doris/datasource/FileScanNode.java  |   2 +-
 .../apache/doris/datasource/hive/HMSDlaTable.java  |   9 +
 .../doris/datasource/hive/HMSExternalCatalog.java  |   6 +
 .../doris/datasource/hive/HMSExternalTable.java    |  25 +-
 .../doris/datasource/hive/IcebergDlaTable.java     | 147 ++++++++
 .../datasource/iceberg/IcebergExternalCatalog.java |   1 -
 .../datasource/iceberg/IcebergExternalTable.java   | 350 ++-----------------
 .../datasource/iceberg/IcebergMetadataCache.java   |  17 +-
 .../doris/datasource/iceberg/IcebergUtils.java     | 375 ++++++++++++++++++++-
 .../datasource/iceberg/source/IcebergScanNode.java |  14 +-
 .../java/org/apache/doris/planner/ScanNode.java    |  10 +-
 .../iceberg/IcebergExternalTableTest.java          |  38 +--
 .../test_iceberg_schema_change_with_timetravel.out | Bin 0 -> 691 bytes
 ...st_iceberg_schema_change_with_timetravel.groovy |  87 +++++
 19 files changed, 746 insertions(+), 402 deletions(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
new file mode 100644
index 00000000000..c5795883c4d
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql
@@ -0,0 +1,37 @@
+use demo.test_db;
+
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+
+alter table schema_change_with_time_travel add column c3 int; 
+insert into schema_change_with_time_travel values (4,5,6);
+
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+
+alter table schema_change_with_time_travel add column c4 int;
+
+
+create table schema_change_with_time_travel_orc (c1 int) tblproperties 
("write.format.default"="orc");
+insert into schema_change_with_time_travel_orc values (1);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (2,3);
+
+alter table schema_change_with_time_travel_orc add column c3 int; 
+insert into schema_change_with_time_travel_orc values (4,5,6);
+
+alter table schema_change_with_time_travel_orc drop column c2;
+insert into schema_change_with_time_travel_orc values (7,8);
+
+alter table schema_change_with_time_travel_orc add column c2 int;
+insert into schema_change_with_time_travel_orc values (9,10,11);
+
+alter table schema_change_with_time_travel_orc add column c4 int;
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 96fd8da54af..74da9a88309 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -140,17 +140,6 @@ public interface TableIf {
 
     Column getColumn(String name);
 
-    default int getBaseColumnIdxByName(String colName) {
-        int i = 0;
-        for (Column col : getBaseSchema()) {
-            if (col.getName().equalsIgnoreCase(colName)) {
-                return i;
-            }
-            ++i;
-        }
-        return -1;
-    }
-
     String getMysqlType();
 
     String getEngine();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 52ac5a28bda..d9c620ee0bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -125,6 +125,8 @@ public abstract class ExternalCatalog
             CREATE_TIME,
             USE_META_CACHE);
 
+    protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 
Runtime.getRuntime().availableProcessors();
+
     // Unique id of this catalog, will be assigned after catalog is loaded.
     @SerializedName(value = "id")
     protected long id;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index de3eeff75d9..d0673da56fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -96,11 +96,9 @@ public class ExternalSchemaCache {
     }
 
     public void invalidateTableCache(String dbName, String tblName) {
-        SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
-        schemaCache.invalidate(key);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, 
tblName, catalog.getName());
-        }
+        schemaCache.asMap().keySet().stream()
+            .filter(key -> key.dbName.equals(dbName) && 
key.tblName.equals(tblName))
+            .forEach(schemaCache::invalidate);
     }
 
     public void invalidateDbCache(String dbName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index c3351600fc6..6e852940cec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -251,7 +251,14 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             }
             SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId());
             String colName = slotDesc.getColumn().getName();
-            int idx = tbl.getBaseColumnIdxByName(colName);
+            int idx = -1;
+            List<Column> columns = getColumns();
+            for (int i = 0; i < columns.size(); i++) {
+                if (columns.get(i).getName().equals(colName)) {
+                    idx = i;
+                    break;
+                }
+            }
             if (idx == -1) {
                 throw new UserException("Column " + colName + " not found in 
table " + tbl.getName());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 8d3aeaa6a26..d2c6230ba6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -193,7 +193,7 @@ public abstract class FileScanNode extends ExternalScanNode 
{
         TExpr tExpr = new TExpr();
         tExpr.setNodes(Lists.newArrayList());
 
-        for (Column column : tbl.getBaseSchema()) {
+        for (Column column : getColumns()) {
             Expr expr;
             if (column.getDefaultValue() != null) {
                 if (column.getDefaultValueExprDef() != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
index 1710646ce3d..d5316aa7fdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java
@@ -76,4 +76,13 @@ public abstract class HMSDlaTable implements MTMVBaseTableIf 
{
         Env.getCurrentEnv().getRefreshManager()
                 .refreshTable(hmsTable.getCatalog().getName(), 
hmsTable.getDbName(), hmsTable.getName(), true);
     }
+
+    /**
+     * If the table is supported as related table.
+     * For example, an Iceberg table may become unsupported after partition 
revolution.
+     * @return
+     */
+    protected boolean isValidRelatedTable() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 0ec5153a415..9b506d917f7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -187,6 +187,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
                     
String.valueOf(Config.hive_metastore_client_timeout_second));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+            ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+            Integer.MAX_VALUE,
+            String.format("hms_iceberg_catalog_%s_executor_pool", name),
+            true,
+            preExecutionAuthenticator);
         FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
                 this.bindBrokerName(), 
this.catalogProperty.getHadoopProperties());
         this.fileSystemExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 1d520fc4178..cfd21534db0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -40,6 +40,8 @@ import org.apache.doris.datasource.hudi.HudiMvccSnapshot;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheKey;
 import org.apache.doris.datasource.hudi.HudiSchemaCacheValue;
 import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
@@ -211,7 +213,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
             } else {
                 if (supportedIcebergTable()) {
                     dlaType = DLAType.ICEBERG;
-                    dlaTable = new HiveDlaTable(this);
+                    dlaTable = new IcebergDlaTable(this);
                 } else if (supportedHoodieTable()) {
                     dlaType = DLAType.HUDI;
                     dlaTable = new HudiDlaTable(this);
@@ -315,6 +317,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         if (getDlaType() == DLAType.HUDI) {
             return ((HudiDlaTable) 
dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this))
                     .getSchema();
+        } else if (getDlaType() == DLAType.ICEBERG) {
+            return IcebergUtils.getIcebergSchema(this, getCatalog(), 
getDbName(), getName());
         }
         Optional<SchemaCacheValue> schemaCacheValue = 
cache.getSchemaValue(dbName, name);
         return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
@@ -619,7 +623,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
         makeSureInitialized();
         if (dlaType.equals(DLAType.ICEBERG)) {
-            return getIcebergSchema();
+            return getIcebergSchema(key);
         } else if (dlaType.equals(DLAType.HUDI)) {
             return getHudiSchema(key);
         } else {
@@ -627,10 +631,8 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         }
     }
 
-    private Optional<SchemaCacheValue> getIcebergSchema() {
-        List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name);
-        List<Column> partitionColumns = initPartitionColumns(columns);
-        return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
+    private Optional<SchemaCacheValue> getIcebergSchema(SchemaCacheKey key) {
+        return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, 
((IcebergSchemaCacheKey) key).getSchemaId());
     }
 
     private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) {
@@ -1085,8 +1087,12 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
         if (getDlaType() == DLAType.HUDI) {
             return new 
HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this));
+        } else if (getDlaType() == DLAType.ICEBERG) {
+            return new IcebergMvccSnapshot(
+                IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, 
getCatalog(), getDbName(), getName()));
+        } else {
+            return new EmptyMvccSnapshot();
         }
-        return new EmptyMvccSnapshot();
     }
 
     public boolean firstColumnIsString() {
@@ -1107,4 +1113,9 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 getRemoteTable().getSd().getLocation(),
                 getCatalog().getConfiguration());
     }
+
+    public boolean isValidRelatedTable() {
+        makeSureInitialized();
+        return dlaTable.isValidRelatedTable();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
new file mode 100644
index 00000000000..36b871282a9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.mtmv.MTMVRefreshContext;
+import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot;
+import org.apache.doris.mtmv.MTMVSnapshotIf;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class IcebergDlaTable extends HMSDlaTable {
+
+    private boolean isValidRelatedTableCached = false;
+    private boolean isValidRelatedTable = false;
+
+    public IcebergDlaTable(HMSExternalTable table) {
+        super(table);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return Maps.newHashMap(
+            IcebergUtils.getOrFetchSnapshotCacheValue(
+                    snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName())
+                .getPartitionInfo().getNameToPartitionItem());
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isValidRelatedTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) {
+        return 
getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                    snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+                hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName(),
+                snapshotValue.getSnapshot().getSchemaId());
+        return schemaValue.getPartitionColumns();
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                        snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        long latestSnapshotId = 
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
+        if (latestSnapshotId <= 0) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVSnapshotIdSnapshot(latestSnapshotId);
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        hmsTable.makeSureInitialized();
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(
+                        snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), 
hmsTable.getName());
+        return new 
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
+    }
+
+    @Override
+    boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    @Override
+    protected boolean isValidRelatedTable() {
+        if (isValidRelatedTableCached) {
+            return isValidRelatedTable;
+        }
+        isValidRelatedTable = false;
+        Set<String> allFields = Sets.newHashSet();
+        Table table = IcebergUtils.getIcebergTable(
+                hmsTable.getCatalog(),
+                hmsTable.getDbName(),
+                hmsTable.getName()
+        );
+        for (PartitionSpec spec : table.specs().values()) {
+            if (spec == null) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            List<PartitionField> fields = spec.fields();
+            if (fields.size() != 1) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            PartitionField partitionField = spec.fields().get(0);
+            String transformName = partitionField.transform().toString();
+            if (!IcebergUtils.YEAR.equals(transformName)
+                    && !IcebergUtils.MONTH.equals(transformName)
+                    && !IcebergUtils.DAY.equals(transformName)
+                    && !IcebergUtils.HOUR.equals(transformName)) {
+                isValidRelatedTableCached = true;
+                return false;
+            }
+            
allFields.add(table.schema().findColumnName(partitionField.sourceId()));
+        }
+        isValidRelatedTableCached = true;
+        isValidRelatedTable = allFields.size() == 1;
+        return isValidRelatedTable;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 9bb7ca8ae08..225e14af420 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -47,7 +47,6 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     protected String icebergCatalogType;
     protected Catalog catalog;
-    private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
 
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index 16964ead1d2..2e8cdc63196 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -17,25 +17,18 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.PartitionItem;
-import org.apache.doris.catalog.PartitionKey;
 import org.apache.doris.catalog.PartitionType;
-import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
-import org.apache.doris.datasource.CacheException;
-import org.apache.doris.datasource.ExternalSchemaCache;
 import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.SchemaCacheValue;
 import org.apache.doris.datasource.mvcc.MvccSnapshot;
 import org.apache.doris.datasource.mvcc.MvccTable;
-import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.mtmv.MTMVBaseTableIf;
 import org.apache.doris.mtmv.MTMVRefreshContext;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
@@ -50,32 +43,12 @@ import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataTableType;
-import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.PartitionsTable;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructProjection;
 
-import java.io.IOException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.Month;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -85,15 +58,7 @@ import java.util.stream.Collectors;
 
 public class IcebergExternalTable extends ExternalTable implements 
MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {
 
-    public static final String YEAR = "year";
-    public static final String MONTH = "month";
-    public static final String DAY = "day";
-    public static final String HOUR = "hour";
-    public static final String IDENTITY = "identity";
-    public static final int PARTITION_DATA_ID_START = 1000; // 
org.apache.iceberg.PartitionSpec
-
     private Table table;
-    private List<Column> partitionColumns;
     private boolean isValidRelatedTableCached = false;
     private boolean isValidRelatedTable = false;
 
@@ -118,29 +83,9 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         this.table = table;
     }
 
-    @VisibleForTesting
-    public void setPartitionColumns(List<Column> partitionColumns) {
-        this.partitionColumns = partitionColumns;
-    }
-
     @Override
     public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
-        table = getIcebergTable();
-        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name,
-                ((IcebergSchemaCacheKey) key).getSchemaId());
-        List<Column> tmpColumns = Lists.newArrayList();
-        PartitionSpec spec = table.spec();
-        for (PartitionField field : spec.fields()) {
-            Types.NestedField col = table.schema().findField(field.sourceId());
-            for (Column c : schema) {
-                if (c.getName().equalsIgnoreCase(col.name())) {
-                    tmpColumns.add(c);
-                    break;
-                }
-            }
-        }
-        partitionColumns = tmpColumns;
-        return Optional.of(new IcebergSchemaCacheValue(schema, 
partitionColumns));
+        return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, 
((IcebergSchemaCacheKey) key).getSchemaId());
     }
 
     @Override
@@ -178,23 +123,21 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
 
-    private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() {
-        return 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
-            .getSnapshotCache(catalog, dbName, name);
-    }
-
     @Override
     public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
     }
 
     @Override
     public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem());
+        return Maps.newHashMap(
+            IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), 
getDbName(), getName())
+                .getPartitionInfo().getNameToPartitionItem());
     }
 
     @Override
     public Map<String, PartitionItem> 
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
-        return 
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+        return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName())
+            .getPartitionInfo().getNameToPartitionItem();
     }
 
     @Override
@@ -209,15 +152,18 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
 
     @Override
     public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
-        IcebergSchemaCacheValue schemaValue = 
getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId());
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
+        IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue(
+                catalog, getDbName(), getName(), 
snapshotValue.getSnapshot().getSchemaId());
         return schemaValue.getPartitionColumns();
     }
 
     @Override
     public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
                                                Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
         long latestSnapshotId = 
snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName);
         if (latestSnapshotId <= 0) {
             throw new AnalysisException("can not find partition: " + 
partitionName);
@@ -229,7 +175,8 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
             throws AnalysisException {
         makeSureInitialized();
-        IcebergSnapshotCacheValue snapshotValue = 
getOrFetchSnapshotCacheValue(snapshot);
+        IcebergSnapshotCacheValue snapshotValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, 
getCatalog(), getDbName(), getName());
         return new 
MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId());
     }
 
@@ -264,10 +211,10 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
             }
             PartitionField partitionField = spec.fields().get(0);
             String transformName = partitionField.transform().toString();
-            if (!YEAR.equals(transformName)
-                    && !MONTH.equals(transformName)
-                    && !DAY.equals(transformName)
-                    && !HOUR.equals(transformName)) {
+            if (!IcebergUtils.YEAR.equals(transformName)
+                    && !IcebergUtils.MONTH.equals(transformName)
+                    && !IcebergUtils.DAY.equals(transformName)
+                    && !IcebergUtils.HOUR.equals(transformName)) {
                 isValidRelatedTableCached = true;
                 return false;
             }
@@ -280,27 +227,13 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
 
     @Override
     public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) {
-        return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue());
-    }
-
-    public long getLatestSnapshotId() {
-        table = getIcebergTable();
-        Snapshot snapshot = table.currentSnapshot();
-        return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : 
table.currentSnapshot().snapshotId();
-    }
-
-    public long getSchemaId(long snapshotId) {
-        table = getIcebergTable();
-        return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID
-                ? IcebergUtils.UNKNOWN_SNAPSHOT_ID
-                : table.snapshot(snapshotId).schemaId();
+        return new 
IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue(
+                tableSnapshot, getCatalog(), getDbName(), getName()));
     }
 
     @Override
     public List<Column> getFullSchema() {
-        Optional<MvccSnapshot> snapshotFromContext = 
MvccUtil.getSnapshotFromContext(this);
-        IcebergSnapshotCacheValue cacheValue = 
getOrFetchSnapshotCacheValue(snapshotFromContext);
-        return 
getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema();
+        return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), 
getName());
     }
 
     @Override
@@ -308,239 +241,6 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
         return true;
     }
 
-    public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) {
-        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
-        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
-            new IcebergSchemaCacheKey(dbName, name, schemaId));
-        if (!schemaCacheValue.isPresent()) {
-            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
-                null, catalog.getName(), dbName, name, schemaId);
-        }
-        return (IcebergSchemaCacheValue) schemaCacheValue.get();
-    }
-
-    public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws 
AnalysisException {
-        // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, 
haven't contained any snapshot yet.
-        if (!isValidRelatedTable() || snapshotId == 
IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
-            return IcebergPartitionInfo.empty();
-        }
-        List<IcebergPartition> icebergPartitions = 
loadIcebergPartition(snapshotId);
-        Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
-        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
-        table = getIcebergTable();
-        partitionColumns = 
getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns();
-        for (IcebergPartition partition : icebergPartitions) {
-            nameToPartition.put(partition.getPartitionName(), partition);
-            String transform = 
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
-            Range<PartitionKey> partitionRange = getPartitionRange(
-                    partition.getPartitionValues().get(0), transform, 
partitionColumns);
-            PartitionItem item = new RangePartitionItem(partitionRange);
-            nameToPartitionItem.put(partition.getPartitionName(), item);
-        }
-        Map<String, Set<String>> partitionNameMap = 
mergeOverlapPartitions(nameToPartitionItem);
-        return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, 
partitionNameMap);
-    }
-
-    public List<IcebergPartition> loadIcebergPartition(long snapshotId) {
-        PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
-                .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
-        List<IcebergPartition> partitions = Lists.newArrayList();
-        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
-            for (FileScanTask task : tasks) {
-                CloseableIterable<StructLike> rows = task.asDataTask().rows();
-                for (StructLike row : rows) {
-                    partitions.add(generateIcebergPartition(row));
-                }
-            }
-        } catch (IOException e) {
-            LOG.warn("Failed to get Iceberg table {} partition info.", name, 
e);
-        }
-        return partitions;
-    }
-
-    public IcebergPartition generateIcebergPartition(StructLike row) {
-        // row format :
-        // 0. partitionData,
-        // 1. spec_id,
-        // 2. record_count,
-        // 3. file_count,
-        // 4. total_data_file_size_in_bytes,
-        // 5. position_delete_record_count,
-        // 6. position_delete_file_count,
-        // 7. equality_delete_record_count,
-        // 8. equality_delete_file_count,
-        // 9. last_updated_at,
-        // 10. last_updated_snapshot_id
-        table = getIcebergTable();
-        Preconditions.checkState(!table.spec().fields().isEmpty(), 
table.name() + " is not a partition table.");
-        int specId = row.get(1, Integer.class);
-        PartitionSpec partitionSpec = table.specs().get(specId);
-        StructProjection partitionData = row.get(0, StructProjection.class);
-        StringBuilder sb = new StringBuilder();
-        List<String> partitionValues = Lists.newArrayList();
-        List<String> transforms = Lists.newArrayList();
-        for (int i = 0; i < partitionSpec.fields().size(); ++i) {
-            PartitionField partitionField = partitionSpec.fields().get(i);
-            Class<?> fieldClass = partitionSpec.javaClasses()[i];
-            int fieldId = partitionField.fieldId();
-            // Iceberg partition field id starts at PARTITION_DATA_ID_START,
-            // So we can get the field index in partitionData using fieldId - 
PARTITION_DATA_ID_START
-            int index = fieldId - PARTITION_DATA_ID_START;
-            Object o = partitionData.get(index, fieldClass);
-            String fieldValue = o == null ? null : o.toString();
-            String fieldName = partitionField.name();
-            sb.append(fieldName);
-            sb.append("=");
-            sb.append(fieldValue);
-            sb.append("/");
-            partitionValues.add(fieldValue);
-            transforms.add(partitionField.transform().toString());
-        }
-        if (sb.length() > 0) {
-            sb.delete(sb.length() - 1, sb.length());
-        }
-        String partitionName = sb.toString();
-        long recordCount = row.get(2, Long.class);
-        long fileCount = row.get(3, Integer.class);
-        long fileSizeInBytes = row.get(4, Long.class);
-        long lastUpdateTime = row.get(9, Long.class);
-        long lastUpdateSnapShotId = row.get(10, Long.class);
-        return new IcebergPartition(partitionName, specId, recordCount, 
fileSizeInBytes, fileCount,
-                lastUpdateTime, lastUpdateSnapShotId, partitionValues, 
transforms);
-    }
-
-    @VisibleForTesting
-    public Range<PartitionKey> getPartitionRange(String value, String 
transform, List<Column> partitionColumns)
-            throws AnalysisException {
-        // For NULL value, create a minimum partition for it.
-        if (value == null) {
-            PartitionKey nullLowKey = PartitionKey.createPartitionKey(
-                    Lists.newArrayList(new PartitionValue("0000-01-01")), 
partitionColumns);
-            PartitionKey nullUpKey = nullLowKey.successor();
-            return Range.closedOpen(nullLowKey, nullUpKey);
-        }
-        LocalDateTime epoch = 
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
-        LocalDateTime target;
-        LocalDateTime lower;
-        LocalDateTime upper;
-        long longValue = Long.parseLong(value);
-        switch (transform) {
-            case HOUR:
-                target = epoch.plusHours(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(),
-                        target.getHour(), 0, 0);
-                upper = lower.plusHours(1);
-                break;
-            case DAY:
-                target = epoch.plusDays(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(), 0, 0, 0);
-                upper = lower.plusDays(1);
-                break;
-            case MONTH:
-                target = epoch.plusMonths(longValue);
-                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
1, 0, 0, 0);
-                upper = lower.plusMonths(1);
-                break;
-            case YEAR:
-                target = epoch.plusYears(longValue);
-                lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 
0, 0, 0);
-                upper = lower.plusYears(1);
-                break;
-            default:
-                throw new RuntimeException("Unsupported transform " + 
transform);
-        }
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
-        Column c = partitionColumns.get(0);
-        Preconditions.checkState(c.getDataType().isDateType(), "Only support 
date type partition column");
-        if (c.getType().isDate() || c.getType().isDateV2()) {
-            formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
-        }
-        PartitionValue lowerValue = new 
PartitionValue(lower.format(formatter));
-        PartitionValue upperValue = new 
PartitionValue(upper.format(formatter));
-        PartitionKey lowKey = 
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), 
partitionColumns);
-        PartitionKey upperKey =  
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), 
partitionColumns);
-        return Range.closedOpen(lowKey, upperKey);
-    }
-
-    /**
-     * Merge overlapped iceberg partitions into one Doris partition.
-     */
-    public Map<String, Set<String>> mergeOverlapPartitions(Map<String, 
PartitionItem> originPartitions) {
-        List<Map.Entry<String, PartitionItem>> entries = 
sortPartitionMap(originPartitions);
-        Map<String, Set<String>> map = Maps.newHashMap();
-        for (int i = 0; i < entries.size() - 1; i++) {
-            Range<PartitionKey> firstValue = 
entries.get(i).getValue().getItems();
-            String firstKey = entries.get(i).getKey();
-            Range<PartitionKey> secondValue = entries.get(i + 
1).getValue().getItems();
-            String secondKey = entries.get(i + 1).getKey();
-            // If the first entry enclose the second one, remove the second 
entry and keep a record in the return map.
-            // So we can track the iceberg partitions those contained by one 
Doris partition.
-            while (i < entries.size() && firstValue.encloses(secondValue)) {
-                originPartitions.remove(secondKey);
-                map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
-                String finalSecondKey = secondKey;
-                map.computeIfPresent(firstKey, (key, value) -> {
-                    value.add(finalSecondKey);
-                    return value;
-                });
-                i++;
-                if (i >= entries.size() - 1) {
-                    break;
-                }
-                secondValue = entries.get(i + 1).getValue().getItems();
-                secondKey = entries.get(i + 1).getKey();
-            }
-        }
-        return map;
-    }
-
-    /**
-     * Sort the given map entries by PartitionItem Range(LOW, HIGH)
-     * When comparing two ranges, the one with smaller LOW value is smaller 
than the other one.
-     * If two ranges have same values of LOW, the one with larger HIGH value 
is smaller.
-     *
-     * For now, we only support year, month, day and hour,
-     * so it is impossible to have two partially intersect partitions.
-     * One range is either enclosed by another or has no intersection at all 
with another.
-     *
-     *
-     * For example, we have these 4 ranges:
-     * [10, 20), [30, 40), [0, 30), [10, 15)
-     *
-     * After sort, they become:
-     * [0, 30), [10, 20), [10, 15), [30, 40)
-     */
-    public List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String, 
PartitionItem> originPartitions) {
-        List<Map.Entry<String, PartitionItem>> entries = new 
ArrayList<>(originPartitions.entrySet());
-        entries.sort(new RangeComparator());
-        return entries;
-    }
-
-    public static class RangeComparator implements 
Comparator<Map.Entry<String, PartitionItem>> {
-        @Override
-        public int compare(Map.Entry<String, PartitionItem> p1, 
Map.Entry<String, PartitionItem> p2) {
-            PartitionItem value1 = p1.getValue();
-            PartitionItem value2 = p2.getValue();
-            if (value1 instanceof RangePartitionItem && value2 instanceof 
RangePartitionItem) {
-                Range<PartitionKey> items1 = value1.getItems();
-                Range<PartitionKey> items2 = value2.getItems();
-                if (!items1.hasLowerBound()) {
-                    return -1;
-                }
-                if (!items2.hasLowerBound()) {
-                    return 1;
-                }
-                PartitionKey upper1 = items1.upperEndpoint();
-                PartitionKey lower1 = items1.lowerEndpoint();
-                PartitionKey upper2 = items2.upperEndpoint();
-                PartitionKey lower2 = items2.lowerEndpoint();
-                int compareLow = lower1.compareTo(lower2);
-                return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
-            }
-            return 0;
-        }
-    }
-
     @VisibleForTesting
     public boolean isValidRelatedTableCached() {
         return isValidRelatedTableCached;
@@ -554,12 +254,4 @@ public class IcebergExternalTable extends ExternalTable 
implements MTMVRelatedTa
     public void setIsValidRelatedTableCached(boolean isCached) {
         this.isValidRelatedTableCached = isCached;
     }
-
-    public IcebergSnapshotCacheValue 
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
-        if (snapshot.isPresent()) {
-            return ((IcebergMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
-        } else {
-            return getIcebergSnapshotCacheValue();
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index f3daf2d2795..57c5eb20c64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.thrift.TIcebergMetadataParams;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -136,12 +137,18 @@ public class IcebergMetadataCache {
 
     @NotNull
     private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey 
key) throws AnalysisException {
-        IcebergExternalTable table = (IcebergExternalTable) 
key.catalog.getDbOrAnalysisException(key.dbName)
+        MTMVRelatedTableIf table = (MTMVRelatedTableIf) 
key.catalog.getDbOrAnalysisException(key.dbName)
                 .getTableOrAnalysisException(key.tableName);
-        long snapshotId = table.getLatestSnapshotId();
-        long schemaId = table.getSchemaId(snapshotId);
-        IcebergPartitionInfo icebergPartitionInfo = 
table.loadPartitionInfo(snapshotId);
-        return new IcebergSnapshotCacheValue(icebergPartitionInfo, new 
IcebergSnapshot(snapshotId, schemaId));
+        IcebergSnapshot lastedIcebergSnapshot = 
IcebergUtils.getLastedIcebergSnapshot(
+                (ExternalCatalog) key.catalog, key.dbName, key.tableName);
+        IcebergPartitionInfo icebergPartitionInfo;
+        if (!table.isValidRelatedTable()) {
+            icebergPartitionInfo = IcebergPartitionInfo.empty();
+        } else {
+            icebergPartitionInfo = IcebergUtils.loadPartitionInfo(
+                (ExternalCatalog) key.catalog, key.dbName, key.tableName, 
lastedIcebergSnapshot.getSnapshotId());
+        }
+        return new IcebergSnapshotCacheValue(icebergPartitionInfo, 
lastedIcebergSnapshot);
     }
 
     public void invalidateCatalogCache(long catalogId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index a3cbfec688e..f1d6c4eb033 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -31,37 +31,58 @@ import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PartitionDesc;
+import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.Subquery;
+import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MapType;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.RangePartitionItem;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionsTable;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.And;
@@ -77,13 +98,25 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.LocationUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.StructProjection;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -114,7 +147,15 @@ public class IcebergUtils {
     // nickname in spark
     public static final String SPARK_SQL_COMPRESSION_CODEC = 
"spark.sql.iceberg.compression-codec";
 
-    public static final long UNKNOWN_SNAPSHOT_ID = -1;
+    public static final long UNKNOWN_SNAPSHOT_ID = -1;  // means an empty table
+    public static final long NEWEST_SCHEMA_ID = -1;
+
+    public static final String YEAR = "year";
+    public static final String MONTH = "month";
+    public static final String DAY = "day";
+    public static final String HOUR = "hour";
+    public static final String IDENTITY = "identity";
+    public static final int PARTITION_DATA_ID_START = 1000; // 
org.apache.iceberg.PartitionSpec
 
     public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
@@ -579,10 +620,6 @@ public class IcebergUtils {
                 : metadataCache.getIcebergTable(catalog, dbName, tblName);
     }
 
-    public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name) {
-        return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID);
-    }
-
     /**
      * Get iceberg schema from catalog and convert them to doris schema
      */
@@ -591,7 +628,7 @@ public class IcebergUtils {
             return catalog.getPreExecutionAuthenticator().execute(() -> {
                 org.apache.iceberg.Table icebergTable = 
getIcebergTable(catalog, dbName, name);
                 Schema schema;
-                if (schemaId == UNKNOWN_SNAPSHOT_ID || 
icebergTable.currentSnapshot() == null) {
+                if (schemaId == NEWEST_SCHEMA_ID || 
icebergTable.currentSnapshot() == null) {
                     schema = icebergTable.schema();
                 } else {
                     schema = icebergTable.schemas().get((int) schemaId);
@@ -744,4 +781,330 @@ public class IcebergUtils {
 
         return matchingManifests;
     }
+
+    // get snapshot id from query like 'for version/time as of'
+    public static long getQuerySpecSnapshot(Table table, TableSnapshot 
queryTableSnapshot) {
+        TableSnapshot.VersionType type = queryTableSnapshot.getType();
+        if (type == TableSnapshot.VersionType.VERSION) {
+            return queryTableSnapshot.getVersion();
+        } else {
+            long timestamp = 
TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), 
TimeUtils.getTimeZone());
+            if (timestamp < 0) {
+                throw new DateTimeException("can't parse time: " + 
queryTableSnapshot.getTime());
+            }
+            return SnapshotUtil.snapshotIdAsOfTime(table, timestamp);
+        }
+    }
+
+    // read schema from external schema cache
+    public static IcebergSchemaCacheValue getSchemaCacheValue(
+            ExternalCatalog catalog, String dbName, String name, long 
schemaId) {
+        ExternalSchemaCache cache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+            new IcebergSchemaCacheKey(dbName, name, schemaId));
+        if (!schemaCacheValue.isPresent()) {
+            throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+                null, catalog.getName(), dbName, name, schemaId);
+        }
+        return (IcebergSchemaCacheValue) schemaCacheValue.get();
+    }
+
+    public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog 
catalog, String dbName, String tbName) {
+        Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+        Snapshot snapshot = table.currentSnapshot();
+        long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID 
: snapshot.snapshotId();
+        return new IcebergSnapshot(snapshotId, table.schema().schemaId());
+    }
+
+    public static IcebergPartitionInfo loadPartitionInfo(
+            ExternalCatalog catalog, String dbName, String tbName, long 
snapshotId) throws AnalysisException {
+        // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, 
haven't contained any snapshot yet.
+        if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) {
+            return IcebergPartitionInfo.empty();
+        }
+        Table table = getIcebergTable(catalog, dbName, tbName);
+        List<IcebergPartition> icebergPartitions = loadIcebergPartition(table, 
snapshotId);
+        Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
+        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+
+        List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue(
+                catalog, dbName, tbName, 
table.snapshot(snapshotId).schemaId()).getPartitionColumns();
+        for (IcebergPartition partition : icebergPartitions) {
+            nameToPartition.put(partition.getPartitionName(), partition);
+            String transform = 
table.specs().get(partition.getSpecId()).fields().get(0).transform().toString();
+            Range<PartitionKey> partitionRange = getPartitionRange(
+                    partition.getPartitionValues().get(0), transform, 
partitionColumns);
+            PartitionItem item = new RangePartitionItem(partitionRange);
+            nameToPartitionItem.put(partition.getPartitionName(), item);
+        }
+        Map<String, Set<String>> partitionNameMap = 
mergeOverlapPartitions(nameToPartitionItem);
+        return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, 
partitionNameMap);
+    }
+
+    private static List<IcebergPartition> loadIcebergPartition(Table table, 
long snapshotId) {
+        PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+                .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
+        List<IcebergPartition> partitions = Lists.newArrayList();
+        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) {
+            for (FileScanTask task : tasks) {
+                CloseableIterable<StructLike> rows = task.asDataTask().rows();
+                for (StructLike row : rows) {
+                    partitions.add(generateIcebergPartition(table, row));
+                }
+            }
+        } catch (IOException e) {
+            LOG.warn("Failed to get Iceberg table {} partition info.", 
table.name(), e);
+        }
+        return partitions;
+    }
+
+    private static IcebergPartition generateIcebergPartition(Table table, 
StructLike row) {
+        // row format :
+        // 0. partitionData,
+        // 1. spec_id,
+        // 2. record_count,
+        // 3. file_count,
+        // 4. total_data_file_size_in_bytes,
+        // 5. position_delete_record_count,
+        // 6. position_delete_file_count,
+        // 7. equality_delete_record_count,
+        // 8. equality_delete_file_count,
+        // 9. last_updated_at,
+        // 10. last_updated_snapshot_id
+        Preconditions.checkState(!table.spec().fields().isEmpty(), 
table.name() + " is not a partition table.");
+        int specId = row.get(1, Integer.class);
+        PartitionSpec partitionSpec = table.specs().get(specId);
+        StructProjection partitionData = row.get(0, StructProjection.class);
+        StringBuilder sb = new StringBuilder();
+        List<String> partitionValues = Lists.newArrayList();
+        List<String> transforms = Lists.newArrayList();
+        for (int i = 0; i < partitionSpec.fields().size(); ++i) {
+            PartitionField partitionField = partitionSpec.fields().get(i);
+            Class<?> fieldClass = partitionSpec.javaClasses()[i];
+            int fieldId = partitionField.fieldId();
+            // Iceberg partition field id starts at PARTITION_DATA_ID_START,
+            // So we can get the field index in partitionData using fieldId - 
PARTITION_DATA_ID_START
+            int index = fieldId - PARTITION_DATA_ID_START;
+            Object o = partitionData.get(index, fieldClass);
+            String fieldValue = o == null ? null : o.toString();
+            String fieldName = partitionField.name();
+            sb.append(fieldName);
+            sb.append("=");
+            sb.append(fieldValue);
+            sb.append("/");
+            partitionValues.add(fieldValue);
+            transforms.add(partitionField.transform().toString());
+        }
+        if (sb.length() > 0) {
+            sb.delete(sb.length() - 1, sb.length());
+        }
+        String partitionName = sb.toString();
+        long recordCount = row.get(2, Long.class);
+        long fileCount = row.get(3, Integer.class);
+        long fileSizeInBytes = row.get(4, Long.class);
+        long lastUpdateTime = row.get(9, Long.class);
+        long lastUpdateSnapShotId = row.get(10, Long.class);
+        return new IcebergPartition(partitionName, specId, recordCount, 
fileSizeInBytes, fileCount,
+            lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms);
+    }
+
+    @VisibleForTesting
+    public static Range<PartitionKey> getPartitionRange(String value, String 
transform, List<Column> partitionColumns)
+            throws AnalysisException {
+        // For NULL value, create a minimum partition for it.
+        if (value == null) {
+            PartitionKey nullLowKey = PartitionKey.createPartitionKey(
+                    Lists.newArrayList(new PartitionValue("0000-01-01")), 
partitionColumns);
+            PartitionKey nullUpKey = nullLowKey.successor();
+            return Range.closedOpen(nullLowKey, nullUpKey);
+        }
+        LocalDateTime epoch = 
Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime();
+        LocalDateTime target;
+        LocalDateTime lower;
+        LocalDateTime upper;
+        long longValue = Long.parseLong(value);
+        switch (transform) {
+            case HOUR:
+                target = epoch.plusHours(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(),
+                    target.getHour(), 0, 0);
+                upper = lower.plusHours(1);
+                break;
+            case DAY:
+                target = epoch.plusDays(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
target.getDayOfMonth(), 0, 0, 0);
+                upper = lower.plusDays(1);
+                break;
+            case MONTH:
+                target = epoch.plusMonths(longValue);
+                lower = LocalDateTime.of(target.getYear(), target.getMonth(), 
1, 0, 0, 0);
+                upper = lower.plusMonths(1);
+                break;
+            case YEAR:
+                target = epoch.plusYears(longValue);
+                lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 
0, 0, 0);
+                upper = lower.plusYears(1);
+                break;
+            default:
+                throw new RuntimeException("Unsupported transform " + 
transform);
+        }
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+        Column c = partitionColumns.get(0);
+        Preconditions.checkState(c.getDataType().isDateType(), "Only support 
date type partition column");
+        if (c.getType().isDate() || c.getType().isDateV2()) {
+            formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+        }
+        PartitionValue lowerValue = new 
PartitionValue(lower.format(formatter));
+        PartitionValue upperValue = new 
PartitionValue(upper.format(formatter));
+        PartitionKey lowKey = 
PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), 
partitionColumns);
+        PartitionKey upperKey =  
PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), 
partitionColumns);
+        return Range.closedOpen(lowKey, upperKey);
+    }
+
+    /**
+     * Merge overlapped iceberg partitions into one Doris partition.
+     */
+    @VisibleForTesting
+    public static Map<String, Set<String>> mergeOverlapPartitions(Map<String, 
PartitionItem> originPartitions) {
+        List<Map.Entry<String, PartitionItem>> entries = 
sortPartitionMap(originPartitions);
+        Map<String, Set<String>> map = Maps.newHashMap();
+        for (int i = 0; i < entries.size() - 1; i++) {
+            Range<PartitionKey> firstValue = 
entries.get(i).getValue().getItems();
+            String firstKey = entries.get(i).getKey();
+            Range<PartitionKey> secondValue = entries.get(i + 
1).getValue().getItems();
+            String secondKey = entries.get(i + 1).getKey();
+            // If the first entry enclose the second one, remove the second 
entry and keep a record in the return map.
+            // So we can track the iceberg partitions those contained by one 
Doris partition.
+            while (i < entries.size() && firstValue.encloses(secondValue)) {
+                originPartitions.remove(secondKey);
+                map.putIfAbsent(firstKey, Sets.newHashSet(firstKey));
+                String finalSecondKey = secondKey;
+                map.computeIfPresent(firstKey, (key, value) -> {
+                    value.add(finalSecondKey);
+                    return value;
+                });
+                i++;
+                if (i >= entries.size() - 1) {
+                    break;
+                }
+                secondValue = entries.get(i + 1).getValue().getItems();
+                secondKey = entries.get(i + 1).getKey();
+            }
+        }
+        return map;
+    }
+
+    /**
+     * Sort the given map entries by PartitionItem Range(LOW, HIGH)
+     * When comparing two ranges, the one with smaller LOW value is smaller 
than the other one.
+     * If two ranges have same values of LOW, the one with larger HIGH value 
is smaller.
+     *
+     * For now, we only support year, month, day and hour,
+     * so it is impossible to have two partially intersect partitions.
+     * One range is either enclosed by another or has no intersection at all 
with another.
+     *
+     *
+     * For example, we have these 4 ranges:
+     * [10, 20), [30, 40), [0, 30), [10, 15)
+     *
+     * After sort, they become:
+     * [0, 30), [10, 20), [10, 15), [30, 40)
+     */
+    @VisibleForTesting
+    public static List<Map.Entry<String, PartitionItem>> 
sortPartitionMap(Map<String, PartitionItem> originPartitions) {
+        List<Map.Entry<String, PartitionItem>> entries = new 
ArrayList<>(originPartitions.entrySet());
+        entries.sort(new RangeComparator());
+        return entries;
+    }
+
+    public static class RangeComparator implements 
Comparator<Map.Entry<String, PartitionItem>> {
+        @Override
+        public int compare(Map.Entry<String, PartitionItem> p1, 
Map.Entry<String, PartitionItem> p2) {
+            PartitionItem value1 = p1.getValue();
+            PartitionItem value2 = p2.getValue();
+            if (value1 instanceof RangePartitionItem && value2 instanceof 
RangePartitionItem) {
+                Range<PartitionKey> items1 = value1.getItems();
+                Range<PartitionKey> items2 = value2.getItems();
+                if (!items1.hasLowerBound()) {
+                    return -1;
+                }
+                if (!items2.hasLowerBound()) {
+                    return 1;
+                }
+                PartitionKey upper1 = items1.upperEndpoint();
+                PartitionKey lower1 = items1.lowerEndpoint();
+                PartitionKey upper2 = items2.upperEndpoint();
+                PartitionKey lower2 = items2.lowerEndpoint();
+                int compareLow = lower1.compareTo(lower2);
+                return compareLow == 0 ? upper2.compareTo(upper1) : compareLow;
+            }
+            return 0;
+        }
+    }
+
+    public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue(
+            Optional<TableSnapshot> tableSnapshot,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        IcebergSnapshotCacheValue snapshotCache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+                .getSnapshotCache(catalog, dbName, tbName);
+        if (tableSnapshot.isPresent()) {
+            // If a snapshot is specified,
+            // use the specified snapshot and the corresponding schema(not the 
latest schema).
+            Table icebergTable = getIcebergTable(catalog, dbName, tbName);
+            TableSnapshot snapshot = tableSnapshot.get();
+            long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, 
snapshot);
+            return new IcebergSnapshotCacheValue(
+                IcebergPartitionInfo.empty(),
+                    new IcebergSnapshot(querySpecSnapshot, 
icebergTable.snapshot(querySpecSnapshot).schemaId()));
+        } else {
+            // Otherwise, use the latest snapshot and the latest schema.
+            return snapshotCache;
+        }
+    }
+
+    // load table schema from iceberg API to external schema cache.
+    public static Optional<SchemaCacheValue> loadSchemaCacheValue(
+            ExternalCatalog catalog, String dbName, String tbName, long 
schemaId) {
+        Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName);
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName, 
schemaId);
+        List<Column> tmpColumns = Lists.newArrayList();
+        PartitionSpec spec = table.spec();
+        for (PartitionField field : spec.fields()) {
+            Types.NestedField col = table.schema().findField(field.sourceId());
+            for (Column c : schema) {
+                if (c.getName().equalsIgnoreCase(col.name())) {
+                    tmpColumns.add(c);
+                    break;
+                }
+            }
+        }
+        return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns));
+    }
+
+    public static List<Column> getIcebergSchema(
+            TableIf tableIf,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        Optional<MvccSnapshot> snapshotFromContext = 
MvccUtil.getSnapshotFromContext(tableIf);
+        IcebergSnapshotCacheValue cacheValue =
+                IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, 
catalog, dbName, tbName);
+        return IcebergUtils.getSchemaCacheValue(
+                catalog, dbName, tbName, 
cacheValue.getSnapshot().getSchemaId())
+            .getSchema();
+    }
+
+    public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(
+            Optional<MvccSnapshot> snapshot,
+            ExternalCatalog catalog,
+            String dbName,
+            String tbName) {
+        if (snapshot.isPresent()) {
+            return ((IcebergMvccSnapshot) 
snapshot.get()).getSnapshotCacheValue();
+        } else {
+            return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), 
catalog, dbName, tbName);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 30a2380a272..a634aa6c91f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.LocationPath;
-import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
@@ -66,13 +65,11 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
-import java.time.DateTimeException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -392,16 +389,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     public Long getSpecifiedSnapshot() {
         TableSnapshot tableSnapshot = getQueryTableSnapshot();
         if (tableSnapshot != null) {
-            TableSnapshot.VersionType type = tableSnapshot.getType();
-            if (type == TableSnapshot.VersionType.VERSION) {
-                return tableSnapshot.getVersion();
-            } else {
-                long timestamp = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
-                if (timestamp < 0) {
-                    throw new DateTimeException("can't parse time: " + 
tableSnapshot.getTime());
-                }
-                return SnapshotUtil.snapshotIdAsOfTime(icebergTable, 
timestamp);
-            }
+            return IcebergUtils.getQuerySpecSnapshot(icebergTable, 
tableSnapshot);
         }
         return null;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 37488a001f6..1dde5f633ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
 
     protected TableSnapshot tableSnapshot;
+    protected List<Column> columns;
 
     // Save the id of backends which this scan node will be executed on.
     // This is also important for local shuffle logic.
@@ -141,6 +142,13 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         return result;
     }
 
+    protected List<Column> getColumns() {
+        if (columns == null && desc.getTable() != null) {
+            columns = desc.getTable().getBaseSchema();
+        }
+        return columns;
+    }
+
     public TupleDescriptor getTupleDesc() {
         return desc;
     }
@@ -233,7 +241,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         // for load scan node, table is null
         // partitionsInfo maybe null for other scan node, eg: 
ExternalScanNode...
         if (desc.getTable() != null) {
-            computeColumnsFilter(desc.getTable().getBaseSchema(), 
partitionsInfo);
+            computeColumnsFilter(getColumns(), partitionsInfo);
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 49144d874a4..f7305e6eb7c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -171,42 +171,39 @@ public class IcebergExternalTableTest {
 
     @Test
     public void testGetPartitionRange() throws AnalysisException {
-        IcebergExternalDatabase database = new IcebergExternalDatabase(null, 
1L, "2", "2");
-        IcebergExternalTable table = new IcebergExternalTable(1, "1", "1", 
null, database);
         Column c = new Column("ts", PrimitiveType.DATETIMEV2);
         List<Column> partitionColumns = Lists.newArrayList(c);
-        table.setPartitionColumns(partitionColumns);
 
         // Test null partition value
-        Range<PartitionKey> nullRange = table.getPartitionRange(null, "hour", 
partitionColumns);
+        Range<PartitionKey> nullRange = IcebergUtils.getPartitionRange(null, 
"hour", partitionColumns);
         Assertions.assertEquals("0000-01-01 00:00:00",
                 
nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("0000-01-01 00:00:01",
                 
nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0));
 
         // Test hour transform.
-        Range<PartitionKey> hour = table.getPartitionRange("100", "hour", 
partitionColumns);
+        Range<PartitionKey> hour = IcebergUtils.getPartitionRange("100", 
"hour", partitionColumns);
         PartitionKey lowKey = hour.lowerEndpoint();
         PartitionKey upKey = hour.upperEndpoint();
         Assertions.assertEquals("1970-01-05 04:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1970-01-05 05:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test day transform.
-        Range<PartitionKey> day = table.getPartitionRange("100", "day", 
partitionColumns);
+        Range<PartitionKey> day = IcebergUtils.getPartitionRange("100", "day", 
partitionColumns);
         lowKey = day.lowerEndpoint();
         upKey = day.upperEndpoint();
         Assertions.assertEquals("1970-04-11 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1970-04-12 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test month transform.
-        Range<PartitionKey> month = table.getPartitionRange("100", "month", 
partitionColumns);
+        Range<PartitionKey> month = IcebergUtils.getPartitionRange("100", 
"month", partitionColumns);
         lowKey = month.lowerEndpoint();
         upKey = month.upperEndpoint();
         Assertions.assertEquals("1978-05-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
         Assertions.assertEquals("1978-06-01 00:00:00", 
upKey.getPartitionValuesAsStringList().get(0));
 
         // Test year transform.
-        Range<PartitionKey> year = table.getPartitionRange("100", "year", 
partitionColumns);
+        Range<PartitionKey> year = IcebergUtils.getPartitionRange("100", 
"year", partitionColumns);
         lowKey = year.lowerEndpoint();
         upKey = year.upperEndpoint();
         Assertions.assertEquals("2070-01-01 00:00:00", 
lowKey.getPartitionValuesAsStringList().get(0));
@@ -214,26 +211,23 @@ public class IcebergExternalTableTest {
 
         // Test unsupported transform
         Exception exception = Assertions.assertThrows(RuntimeException.class, 
() -> {
-            table.getPartitionRange("100", "bucket", partitionColumns);
+            IcebergUtils.getPartitionRange("100", "bucket", partitionColumns);
         });
         Assertions.assertEquals("Unsupported transform bucket", 
exception.getMessage());
     }
 
     @Test
     public void testSortRange() throws AnalysisException {
-        IcebergExternalDatabase database = new IcebergExternalDatabase(null, 
1L, "2", "2");
-        IcebergExternalTable table = new IcebergExternalTable(1, "1", "1", 
null, database);
         Column c = new Column("c", PrimitiveType.DATETIMEV2);
         ArrayList<Column> columns = Lists.newArrayList(c);
-        table.setPartitionColumns(Lists.newArrayList(c));
-        PartitionItem nullRange = new 
RangePartitionItem(table.getPartitionRange(null, "hour", columns));
-        PartitionItem year1970 = new 
RangePartitionItem(table.getPartitionRange("0", "year", columns));
-        PartitionItem year1971 = new 
RangePartitionItem(table.getPartitionRange("1", "year", columns));
-        PartitionItem month197002 = new 
RangePartitionItem(table.getPartitionRange("1", "month", columns));
-        PartitionItem month197103 = new 
RangePartitionItem(table.getPartitionRange("14", "month", columns));
-        PartitionItem month197204 = new 
RangePartitionItem(table.getPartitionRange("27", "month", columns));
-        PartitionItem day19700202 = new 
RangePartitionItem(table.getPartitionRange("32", "day", columns));
-        PartitionItem day19730101 = new 
RangePartitionItem(table.getPartitionRange("1096", "day", columns));
+        PartitionItem nullRange = new 
RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns));
+        PartitionItem year1970 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns));
+        PartitionItem year1971 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns));
+        PartitionItem month197002 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns));
+        PartitionItem month197103 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns));
+        PartitionItem month197204 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns));
+        PartitionItem day19700202 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns));
+        PartitionItem day19730101 = new 
RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns));
         Map<String, PartitionItem> map = Maps.newHashMap();
         map.put("nullRange", nullRange);
         map.put("year1970", year1970);
@@ -243,7 +237,7 @@ public class IcebergExternalTableTest {
         map.put("month197204", month197204);
         map.put("day19700202", day19700202);
         map.put("day19730101", day19730101);
-        List<Map.Entry<String, PartitionItem>> entries = 
table.sortPartitionMap(map);
+        List<Map.Entry<String, PartitionItem>> entries = 
IcebergUtils.sortPartitionMap(map);
         Assertions.assertEquals(8, entries.size());
         Assertions.assertEquals("nullRange", entries.get(0).getKey());
         Assertions.assertEquals("year1970", entries.get(1).getKey());
@@ -254,7 +248,7 @@ public class IcebergExternalTableTest {
         Assertions.assertEquals("month197204", entries.get(6).getKey());
         Assertions.assertEquals("day19730101", entries.get(7).getKey());
 
-        Map<String, Set<String>> stringSetMap = 
table.mergeOverlapPartitions(map);
+        Map<String, Set<String>> stringSetMap = 
IcebergUtils.mergeOverlapPartitions(map);
         Assertions.assertEquals(2, stringSetMap.size());
         Assertions.assertTrue(stringSetMap.containsKey("year1970"));
         Assertions.assertTrue(stringSetMap.containsKey("year1971"));
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
new file mode 100644
index 00000000000..27e9359b713
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
new file mode 100644
index 00000000000..a376dfc210d
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+
+suite("iceberg_schema_change_with_timetravel", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_schema_change_with_timetravel"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+    sql """ use test_db;""" 
+
+    def executeTimeTravelingQueries = { String tableName ->
+        def snapshots = sql """ select snapshot_id from iceberg_meta("table" = 
"${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by 
committed_at; """
+        def snapshotIds = [
+            s0: snapshots.get(0)[0],
+            s1: snapshots.get(1)[0],
+            s2: snapshots.get(2)[0],
+            s3: snapshots.get(3)[0],
+            s4: snapshots.get(4)[0]
+        ]
+
+        qt_q0 """ desc ${tableName} """
+        qt_q1 """ select * from ${tableName} order by c1 """
+        qt_q2 """ select * from ${tableName} for version as of 
${snapshotIds.s0} order by c1 """
+        qt_q3 """ select * from ${tableName} for version as of 
${snapshotIds.s1} order by c1 """
+        qt_q4 """ select * from ${tableName} for version as of 
${snapshotIds.s2} order by c1 """
+        qt_q5 """ select * from ${tableName} for version as of 
${snapshotIds.s3} order by c1 """
+        qt_q6 """ select * from ${tableName} for version as of 
${snapshotIds.s4} order by c1 """
+    }
+
+    executeTimeTravelingQueries("schema_change_with_time_travel")
+    executeTimeTravelingQueries("schema_change_with_time_travel_orc")
+
+}
+
+/*
+create table schema_change_with_time_travel (c1 int);
+insert into schema_change_with_time_travel values (1);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (2,3);
+alter table schema_change_with_time_travel add column c3 int; 
+insert into schema_change_with_time_travel values (4,5,6);
+alter table schema_change_with_time_travel drop column c2;
+insert into schema_change_with_time_travel values (7,8);
+alter table schema_change_with_time_travel add column c2 int;
+insert into schema_change_with_time_travel values (9,10,11);
+alter table schema_change_with_time_travel add column c4 int;
+*/
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to