morrySnow commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1862349495


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
         }
     }
 
+    /******************** begin code for unit test **********************/

Review Comment:
   u could use annotation visableForTesting



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
         }
     }
 
+    /******************** begin code for unit test **********************/
+    public void setTable(Table table) {
+        this.table = table;
+    }
+
+    public List<Column> getSchema() {
+        Schema schema = table.schema();
+        List<Types.NestedField> columns = schema.columns();
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+        for (Types.NestedField field : columns) {
+            tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+                    IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+                    schema.caseInsensitiveFindField(field.name()).fieldId()));
+        }
+        return tmpSchema;
+    }
+    /******************** end code for unit test **********************/
+
     @Override
     public Optional<SchemaCacheValue> initSchema() {
-        return Optional.of(new 
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
+        table = IcebergUtils.getIcebergTable(catalog, dbName, name);
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name);
+        // List<Column> schema = getSchema();
+        Snapshot snapshot = table.currentSnapshot();
+        if (snapshot == null) {
+            LOG.info("Table {} is empty", name);

Review Comment:
   debug log is enough?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
         }
     }
 
+    /******************** begin code for unit test **********************/
+    public void setTable(Table table) {
+        this.table = table;
+    }
+
+    public List<Column> getSchema() {
+        Schema schema = table.schema();
+        List<Types.NestedField> columns = schema.columns();
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+        for (Types.NestedField field : columns) {
+            tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+                    IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+                    schema.caseInsensitiveFindField(field.name()).fieldId()));
+        }
+        return tmpSchema;
+    }
+    /******************** end code for unit test **********************/
+
     @Override
     public Optional<SchemaCacheValue> initSchema() {
-        return Optional.of(new 
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
+        table = IcebergUtils.getIcebergTable(catalog, dbName, name);
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name);
+        // List<Column> schema = getSchema();
+        Snapshot snapshot = table.currentSnapshot();
+        if (snapshot == null) {
+            LOG.info("Table {} is empty", name);
+            return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, 
null));
+        }
+        long snapshotId = snapshot.snapshotId();
+        PartitionSpec spec = table.spec();
+        List<Column> partitionColumns = null;
+        IcebergPartitionInfo partitionInfo = null;
+        if (isSupportedPartitionTable()) {
+            partitionColumns = Lists.newArrayList();
+            PartitionField field = spec.fields().get(0);

Review Comment:
   add comment to notice the reason of `get(0)`



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() {
         }
     }
 
+    /******************** begin code for unit test **********************/
+    public void setTable(Table table) {
+        this.table = table;
+    }
+
+    public List<Column> getSchema() {
+        Schema schema = table.schema();
+        List<Types.NestedField> columns = schema.columns();
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+        for (Types.NestedField field : columns) {
+            tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+                    IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+                    schema.caseInsensitiveFindField(field.name()).fieldId()));
+        }
+        return tmpSchema;
+    }
+    /******************** end code for unit test **********************/
+
     @Override
     public Optional<SchemaCacheValue> initSchema() {
-        return Optional.of(new 
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
+        table = IcebergUtils.getIcebergTable(catalog, dbName, name);
+        List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name);
+        // List<Column> schema = getSchema();

Review Comment:
   useless code?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");

Review Comment:
   throw more reasonable exception message



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    public boolean isSupportedPartitionTable() {
+        // TODO: Support IDENTITY transform.
+        PartitionSpec spec = table.spec();
+        if (spec == null) {
+            return false;
+        }
+        if (spec.fields().size() != 1) {
+            return false;
+        }
+        String transformName = spec.fields().get(0).transform().toString();

Review Comment:
   why use toString? maybe instanceof is a better way?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    public boolean isSupportedPartitionTable() {
+        // TODO: Support IDENTITY transform.
+        PartitionSpec spec = table.spec();
+        if (spec == null) {
+            return false;
+        }
+        if (spec.fields().size() != 1) {
+            return false;
+        }
+        String transformName = spec.fields().get(0).transform().toString();
+        return YEAR.equals(transformName)
+            || MONTH.equals(transformName)
+            || DAY.equals(transformName)
+            || HOUR.equals(transformName);
+    }
+
+    protected IcebergPartitionInfo loadPartitionInfo(List<Column> 
partitionColumns,
+                                                     PartitionSpec spec, 
String transform) throws AnalysisException {
+        List<IcebergPartition> icebergPartitions = loadIcebergPartition(spec);
+        Map<String, IcebergPartition> nameToPartition = Maps.newHashMap();
+        Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+        List<org.apache.doris.catalog.Type> types
+                = 
partitionColumns.stream().map(Column::getType).collect(Collectors.toList());
+        for (IcebergPartition partition : icebergPartitions) {
+            nameToPartition.put(partition.getPartitionValue(), partition);
+            PartitionItem item;
+            if (transform.equals(IDENTITY)) {
+                List<PartitionValue> values = Lists.newArrayList();
+                values.add(new PartitionValue(partition.getPartitionValue()));
+                PartitionKey key = 
PartitionKey.createListPartitionKeyWithTypes(values, types, true);
+                item = new ListPartitionItem(Lists.newArrayList(key));
+            } else {
+                Range<PartitionKey> partitionRange = 
getPartitionRange(Long.parseLong(partition.getPartitionValue()),
+                        transform, partitionColumns);
+                item = new RangePartitionItem(partitionRange);
+            }
+            nameToPartitionItem.put(partition.getPartitionValue(), item);
+        }
+        return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition);
+    }
+
+    protected List<IcebergPartition> loadIcebergPartition(PartitionSpec spec) {
+        PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
+                .createMetadataTableInstance(table, 
MetadataTableType.PARTITIONS);
+        List<IcebergPartition> partitions = Lists.newArrayList();
+        Class<?> c = spec.javaClasses()[0];
+        try (CloseableIterable<FileScanTask> tasks = 
partitionsTable.newScan().planFiles()) {
+            for (FileScanTask task : tasks) {
+                // row format :
+                // 0. partitionData,
+                // 1. spec_id,
+                // 2. record_count,
+                // 3. file_count,
+                // 4. total_data_file_size_in_bytes,
+                // 5. position_delete_record_count,
+                // 6. position_delete_file_count,
+                // 7. equality_delete_record_count,
+                // 8. equality_delete_file_count,
+                // 9. last_updated_at,
+                // 10. last_updated_snapshot_id
+                CloseableIterable<StructLike> rows = task.asDataTask().rows();
+                for (StructLike row : rows) {
+                    StructProjection partitionData = row.get(0, 
StructProjection.class);
+                    String partitionValue = partitionData.get(0, c).toString();

Review Comment:
   need to use the partition spec corresponding to the spec id to generate the 
correct partition information.
   use `org.apache.iceberg.Table#specs` to get all specs



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    public boolean isSupportedPartitionTable() {
+        // TODO: Support IDENTITY transform.

Review Comment:
   add TODO in PR's desc



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;
+    }
+
+    public boolean isSupportedPartitionTable() {

Review Comment:
   how to process partition evolution? for example, before snapshot A, data 
partitioned by identify. after snapshot A, data partitioned by year.
   https://iceberg.apache.org/docs/nightly/evolution/#partition-evolution
   iceberg's meta struct is metadata file (snapshots) -> manifest lists -> 
manifest files -> data files
   each snapshot contains a few manifest files and each manifest file has its 
own partiiton spec. Table's partition spec is used only for writes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to