zddr commented on code in PR #44726:
URL: https://github.com/apache/doris/pull/44726#discussion_r1862888274


##########
regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy:
##########
@@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
         sql """drop materialized view if exists ${mvName};"""
         sql """ drop catalog if exists ${catalog_name} """
     }
+
+    // Test partition refresh.
+    // Use hms catalog to avoid rest catalog fail to write caused by sqlite 
database file locked.
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String hivePrefix = "hive2";
+        String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+        String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+        String warehouse = "${default_fs}/warehouse"
+
+        String catalog_name = "iceberg_mtmv_catalog_hms";
+        String mvName1 = "test_iceberg_mtmv_ts"
+        String mvName2 = "test_iceberg_mtmv_d"
+        String dbName = "regression_test_mtmv_partition_p0"
+        String icebergDb = "iceberg_mtmv_partition"
+        String icebergTable1 = "tstable"
+        String icebergTable2 = "dtable"
+        sql """drop catalog if exists ${catalog_name} """
+        sql """create catalog if not exists ${catalog_name} properties (
+            'type'='iceberg',
+            'iceberg.catalog.type'='hms',
+            'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+            'fs.defaultFS' = '${default_fs}',
+            'warehouse' = '${warehouse}',
+            'use_meta_cache' = 'true'
+        );"""
+
+        sql """switch internal"""
+        sql """drop database if exists ${dbName}"""
+        sql """create database if not exists ${dbName}"""
+        sql """use ${dbName}"""
+
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable1}"""
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable2}"""
+        sql """create database if not exists ${catalog_name}.${icebergDb}"""
+        sql """
+            CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} (
+              ts DATETIME,
+              value INT)
+            ENGINE=iceberg
+            PARTITION BY LIST (DAY(ts)) ();
+        """
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 
21:02:03', 3)"""
+        sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO 
ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${icebergTable1}"""
+        sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""
+        waitingMTMVTaskFinishedByMvName(mvName1)
+        qt_test_ts_refresh1 "select * from ${mvName1} order by value"
+
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 21:02:03', 4)"""
+        sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""
+        waitingMTMVTaskFinishedByMvName(mvName1)
+        qt_test_ts_refresh2 """select * from ${mvName1} order by value"""
+
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);"""
+        sql """REFRESH MATERIALIZED VIEW ${mvName1} 
partitions(p_20241026000000_20241027000000);"""
+        waitingMTMVTaskFinishedByMvName(mvName1)
+        qt_test_ts_refresh3 """select * from ${mvName1} order by value"""
+
+        sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""

Review Comment:
   It is necessary to test the specified auto refresh, otherwise snapshot 
information will not be calculated.
   



##########
regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy:
##########
@@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", 
"p0,external,iceberg,external_docker,external_docker_
         sql """drop materialized view if exists ${mvName};"""
         sql """ drop catalog if exists ${catalog_name} """
     }
+
+    // Test partition refresh.
+    // Use hms catalog to avoid rest catalog fail to write caused by sqlite 
database file locked.
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String hivePrefix = "hive2";
+        String hms_port = context.config.otherConfigs.get(hivePrefix + 
"HmsPort")
+        String hdfs_port = context.config.otherConfigs.get(hivePrefix + 
"HdfsPort")
+        String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}"
+        String warehouse = "${default_fs}/warehouse"
+
+        String catalog_name = "iceberg_mtmv_catalog_hms";
+        String mvName1 = "test_iceberg_mtmv_ts"
+        String mvName2 = "test_iceberg_mtmv_d"
+        String dbName = "regression_test_mtmv_partition_p0"
+        String icebergDb = "iceberg_mtmv_partition"
+        String icebergTable1 = "tstable"
+        String icebergTable2 = "dtable"
+        sql """drop catalog if exists ${catalog_name} """
+        sql """create catalog if not exists ${catalog_name} properties (
+            'type'='iceberg',
+            'iceberg.catalog.type'='hms',
+            'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
+            'fs.defaultFS' = '${default_fs}',
+            'warehouse' = '${warehouse}',
+            'use_meta_cache' = 'true'
+        );"""
+
+        sql """switch internal"""
+        sql """drop database if exists ${dbName}"""
+        sql """create database if not exists ${dbName}"""
+        sql """use ${dbName}"""
+
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable1}"""
+        sql """drop table if exists 
${catalog_name}.${icebergDb}.${icebergTable2}"""
+        sql """create database if not exists ${catalog_name}.${icebergDb}"""
+        sql """
+            CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} (
+              ts DATETIME,
+              value INT)
+            ENGINE=iceberg
+            PARTITION BY LIST (DAY(ts)) ();
+        """
+        sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} 
values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 
21:02:03', 3)"""
+        sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO 
ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${icebergTable1}"""

Review Comment:
   It is also necessary to test the default partition without adding the 
partition by statement. This will calculate the snapshot of the table
   



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java:
##########
@@ -90,4 +188,200 @@ public long fetchRowCount() {
     public Table getIcebergTable() {
         return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), 
getName());
     }
+
+    @Override
+    public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
+        Env.getCurrentEnv().getRefreshManager()
+            .refreshTable(getCatalog().getName(), getDbName(), getName(), 
true);
+    }
+
+    @Override
+    public Map<String, PartitionItem> 
getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
+        return 
Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
+    }
+
+    private IcebergPartitionInfo getPartitionInfoFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            return new IcebergPartitionInfo();
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getPartitionInfo();
+    }
+
+    @Override
+    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
+        return isSupportedPartitionTable() ? PartitionType.RANGE : 
PartitionType.UNPARTITIONED;
+    }
+
+    @Override
+    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> 
snapshot) throws DdlException {
+        return 
getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet());
+    }
+
+    @Override
+    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+        return getPartitionColumnsFromCache();
+    }
+
+    private List<Column> getPartitionColumnsFromCache() {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        return schemaCacheValue
+                .map(cacheValue -> ((IcebergSchemaCacheValue) 
cacheValue).getPartitionColumns())
+                .orElseGet(Lists::newArrayList);
+    }
+
+    @Override
+    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, 
MTMVRefreshContext context,
+                                               Optional<MvccSnapshot> 
snapshot) throws AnalysisException {
+        IcebergPartition icebergPartition = 
getPartitionInfoFromCache().getNameToPartition().get(partitionName);
+        if (icebergPartition == null) {
+            throw new AnalysisException("can not find partition: " + 
partitionName);
+        }
+        return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId());
+    }
+
+    @Override
+    public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, 
Optional<MvccSnapshot> snapshot)
+            throws AnalysisException {
+        return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
+    }
+
+    public long getLatestSnapshotIdFromCache() throws AnalysisException {
+        makeSureInitialized();
+        Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+        if (!schemaCacheValue.isPresent()) {
+            throw new AnalysisException("not present");
+        }
+        return ((IcebergSchemaCacheValue) 
schemaCacheValue.get()).getSnapshotId();
+    }
+
+    @Override
+    public boolean isPartitionColumnAllowNull() {
+        return true;

Review Comment:
   If the iceberg partition field is written as null, which partition will it 
fall into



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to