morrySnow commented on code in PR #44726: URL: https://github.com/apache/doris/pull/44726#discussion_r1862349495
########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } + /******************** begin code for unit test **********************/ Review Comment: u could use annotation visableForTesting ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } + /******************** begin code for unit test **********************/ + public void setTable(Table table) { + this.table = table; + } + + public List<Column> getSchema() { + Schema schema = table.schema(); + List<Types.NestedField> columns = schema.columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + } + /******************** end code for unit test **********************/ + @Override public Optional<SchemaCacheValue> initSchema() { - return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name); + // List<Column> schema = getSchema(); + Snapshot snapshot = table.currentSnapshot(); + if (snapshot == null) { + LOG.info("Table {} is empty", name); Review Comment: debug log is enough? ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } + /******************** begin code for unit test **********************/ + public void setTable(Table table) { + this.table = table; + } + + public List<Column> getSchema() { + Schema schema = table.schema(); + List<Types.NestedField> columns = schema.columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + } + /******************** end code for unit test **********************/ + @Override public Optional<SchemaCacheValue> initSchema() { - return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name); + // List<Column> schema = getSchema(); + Snapshot snapshot = table.currentSnapshot(); + if (snapshot == null) { + LOG.info("Table {} is empty", name); + return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); + } + long snapshotId = snapshot.snapshotId(); + PartitionSpec spec = table.spec(); + List<Column> partitionColumns = null; + IcebergPartitionInfo partitionInfo = null; + if (isSupportedPartitionTable()) { + partitionColumns = Lists.newArrayList(); + PartitionField field = spec.fields().get(0); Review Comment: add comment to notice the reason of `get(0)` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -51,9 +101,57 @@ protected synchronized void makeSureInitialized() { } } + /******************** begin code for unit test **********************/ + public void setTable(Table table) { + this.table = table; + } + + public List<Column> getSchema() { + Schema schema = table.schema(); + List<Types.NestedField> columns = schema.columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + } + /******************** end code for unit test **********************/ + @Override public Optional<SchemaCacheValue> initSchema() { - return Optional.of(new SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name))); + table = IcebergUtils.getIcebergTable(catalog, dbName, name); + List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name); + // List<Column> schema = getSchema(); Review Comment: useless code? ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); Review Comment: throw more reasonable exception message ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + public boolean isSupportedPartitionTable() { + // TODO: Support IDENTITY transform. + PartitionSpec spec = table.spec(); + if (spec == null) { + return false; + } + if (spec.fields().size() != 1) { + return false; + } + String transformName = spec.fields().get(0).transform().toString(); Review Comment: why use toString? maybe instanceof is a better way? ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + public boolean isSupportedPartitionTable() { + // TODO: Support IDENTITY transform. + PartitionSpec spec = table.spec(); + if (spec == null) { + return false; + } + if (spec.fields().size() != 1) { + return false; + } + String transformName = spec.fields().get(0).transform().toString(); + return YEAR.equals(transformName) + || MONTH.equals(transformName) + || DAY.equals(transformName) + || HOUR.equals(transformName); + } + + protected IcebergPartitionInfo loadPartitionInfo(List<Column> partitionColumns, + PartitionSpec spec, String transform) throws AnalysisException { + List<IcebergPartition> icebergPartitions = loadIcebergPartition(spec); + Map<String, IcebergPartition> nameToPartition = Maps.newHashMap(); + Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap(); + List<org.apache.doris.catalog.Type> types + = partitionColumns.stream().map(Column::getType).collect(Collectors.toList()); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionValue(), partition); + PartitionItem item; + if (transform.equals(IDENTITY)) { + List<PartitionValue> values = Lists.newArrayList(); + values.add(new PartitionValue(partition.getPartitionValue())); + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + item = new ListPartitionItem(Lists.newArrayList(key)); + } else { + Range<PartitionKey> partitionRange = getPartitionRange(Long.parseLong(partition.getPartitionValue()), + transform, partitionColumns); + item = new RangePartitionItem(partitionRange); + } + nameToPartitionItem.put(partition.getPartitionValue(), item); + } + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition); + } + + protected List<IcebergPartition> loadIcebergPartition(PartitionSpec spec) { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List<IcebergPartition> partitions = Lists.newArrayList(); + Class<?> c = spec.javaClasses()[0]; + try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().planFiles()) { + for (FileScanTask task : tasks) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + CloseableIterable<StructLike> rows = task.asDataTask().rows(); + for (StructLike row : rows) { + StructProjection partitionData = row.get(0, StructProjection.class); + String partitionValue = partitionData.get(0, c).toString(); Review Comment: need to use the partition spec corresponding to the spec id to generate the correct partition information. use `org.apache.iceberg.Table#specs` to get all specs ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + public boolean isSupportedPartitionTable() { + // TODO: Support IDENTITY transform. Review Comment: add TODO in PR's desc ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java: ########## @@ -90,4 +188,200 @@ public long fetchRowCount() { public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + + @Override + public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { + Env.getCurrentEnv().getRefreshManager() + .refreshTable(getCatalog().getName(), getDbName(), getName(), true); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + } + + private IcebergPartitionInfo getPartitionInfoFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + return new IcebergPartitionInfo(); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isSupportedPartitionTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException { + return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPartitionColumnsFromCache(); + } + + private List<Column> getPartitionColumnsFromCache() { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue + .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) + .orElseGet(Lists::newArrayList); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergPartition icebergPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName); + if (icebergPartition == null) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVVersionSnapshot(icebergPartition.getLastSnapshotId()); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); + } + + public long getLatestSnapshotIdFromCache() throws AnalysisException { + makeSureInitialized(); + Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); + if (!schemaCacheValue.isPresent()) { + throw new AnalysisException("not present"); + } + return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + } + + @Override + public boolean isPartitionColumnAllowNull() { + return true; + } + + public boolean isSupportedPartitionTable() { Review Comment: how to process partition evolution? for example, before snapshot A, data partitioned by identify. after snapshot A, data partitioned by year. https://iceberg.apache.org/docs/nightly/evolution/#partition-evolution iceberg's meta struct is metadata file (snapshots) -> manifest lists -> manifest files -> data files each snapshot contains a few manifest files and each manifest file has its own partiiton spec. Table's partition spec is used only for writes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org