zddr commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1862888274
########## regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy: ########## @@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + + // Test partition refresh. + // Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String hivePrefix = "hive2"; + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" + + String catalog_name = "iceberg_mtmv_catalog_hms"; + String mvName1 = "test_iceberg_mtmv_ts" + String mvName2 = "test_iceberg_mtmv_d" + String dbName = "regression_test_mtmv_partition_p0" + String icebergDb = "iceberg_mtmv_partition" + String icebergTable1 = "tstable" + String icebergTable2 = "dtable" + sql """drop catalog if exists ${catalog_name} """ + sql """create catalog if not exists ${catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = '${default_fs}', + 'warehouse' = '${warehouse}', + 'use_meta_cache' = 'true' + );""" + + sql """switch internal""" + sql """drop database if exists ${dbName}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """create database if not exists ${catalog_name}.${icebergDb}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} ( + ts DATETIME, + value INT) + ENGINE=iceberg + PARTITION BY LIST (DAY(ts)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)""" + sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh1 "select * from ${mvName1} order by value" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 21:02:03', 4)""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh2 """select * from ${mvName1} order by value""" + + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);""" + sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);""" + waitingMTMVTaskFinishedByMvName(mvName1) + qt_test_ts_refresh3 """select * from ${mvName1} order by value""" + + sql """REFRESH MATERIALIZED VIEW ${mvName1} complete""" Review Comment: It is necessary to test the specified auto refresh, otherwise snapshot information will not be calculated. ########## regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy: ########## @@ -65,5 +65,128 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName};""" sql """ drop catalog if exists ${catalog_name} """ } + + // Test partition refresh. + // Use hms catalog to avoid rest catalog fail to write caused by sqlite database file locked. + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String hivePrefix = "hive2"; + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String default_fs = "hdfs://${externalEnvIp}:${hdfs_port}" + String warehouse = "${default_fs}/warehouse" + + String catalog_name = "iceberg_mtmv_catalog_hms"; + String mvName1 = "test_iceberg_mtmv_ts" + String mvName2 = "test_iceberg_mtmv_d" + String dbName = "regression_test_mtmv_partition_p0" + String icebergDb = "iceberg_mtmv_partition" + String icebergTable1 = "tstable" + String icebergTable2 = "dtable" + sql """drop catalog if exists ${catalog_name} """ + sql """create catalog if not exists ${catalog_name} properties ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = '${default_fs}', + 'warehouse' = '${warehouse}', + 'use_meta_cache' = 'true' + );""" + + sql """switch internal""" + sql """drop database if exists ${dbName}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable1}""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + sql """create database if not exists ${catalog_name}.${icebergDb}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable1} ( + ts DATETIME, + value INT) + ENGINE=iceberg + PARTITION BY LIST (DAY(ts)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)""" + sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}""" Review Comment: It is also necessary to test the default partition without adding the partition by statement. This will calculate the snapshot of the table ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; Review Comment: If the iceberg partition field is written as null, which partition will it fall into -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org