This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 02306b475bb [fix](iceberg)Use the correct schema for query (#50376) 02306b475bb is described below commit 02306b475bb82df7cc496e9bb7d71fd1ccfad635 Author: wuwenchi <wuwen...@selectdb.com> AuthorDate: Tue Apr 29 13:42:14 2025 +0800 [fix](iceberg)Use the correct schema for query (#50376) ### What problem does this PR solve? Followup #49956 Problem Summary: When a snapshot is specified in the query, the corresponding schema should be used for parsing, otherwise the latest snapshot should be used for parsing. 1. When using the HMS type, you also need to initialize the executor pool. 2. Set the size of the thread pool to be equal to the number of cores of the current machine. 3. When no snapshot is specified, the latest schema is used. 4. When specifying a snapshot, you need to use the schema corresponding to the snapshot. 5. When generating a scannode, save the schema information and no longer obtain it from the cache to prevent the cache from being refreshed. 6. When refreshing the schema, you need to refresh all schemas of related tables. --- .../create_preinstalled_scripts/iceberg/run09.sql | 37 ++ .../java/org/apache/doris/catalog/TableIf.java | 11 - .../apache/doris/datasource/ExternalCatalog.java | 2 + .../doris/datasource/ExternalSchemaCache.java | 8 +- .../apache/doris/datasource/FileQueryScanNode.java | 9 +- .../org/apache/doris/datasource/FileScanNode.java | 2 +- .../apache/doris/datasource/hive/HMSDlaTable.java | 9 + .../doris/datasource/hive/HMSExternalCatalog.java | 6 + .../doris/datasource/hive/HMSExternalTable.java | 25 +- .../doris/datasource/hive/IcebergDlaTable.java | 147 ++++++++ .../datasource/iceberg/IcebergExternalCatalog.java | 1 - .../datasource/iceberg/IcebergExternalTable.java | 350 ++----------------- .../datasource/iceberg/IcebergMetadataCache.java | 17 +- .../doris/datasource/iceberg/IcebergUtils.java | 375 ++++++++++++++++++++- .../datasource/iceberg/source/IcebergScanNode.java | 14 +- .../java/org/apache/doris/planner/ScanNode.java | 10 +- .../iceberg/IcebergExternalTableTest.java | 38 +-- .../test_iceberg_schema_change_with_timetravel.out | Bin 0 -> 691 bytes ...st_iceberg_schema_change_with_timetravel.groovy | 87 +++++ 19 files changed, 746 insertions(+), 402 deletions(-) diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql new file mode 100644 index 00000000000..c5795883c4d --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run09.sql @@ -0,0 +1,37 @@ +use demo.test_db; + +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); + +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); + +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); + +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); + +alter table schema_change_with_time_travel add column c4 int; + + +create table schema_change_with_time_travel_orc (c1 int) tblproperties ("write.format.default"="orc"); +insert into schema_change_with_time_travel_orc values (1); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (2,3); + +alter table schema_change_with_time_travel_orc add column c3 int; +insert into schema_change_with_time_travel_orc values (4,5,6); + +alter table schema_change_with_time_travel_orc drop column c2; +insert into schema_change_with_time_travel_orc values (7,8); + +alter table schema_change_with_time_travel_orc add column c2 int; +insert into schema_change_with_time_travel_orc values (9,10,11); + +alter table schema_change_with_time_travel_orc add column c4 int; + diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 96fd8da54af..74da9a88309 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -140,17 +140,6 @@ public interface TableIf { Column getColumn(String name); - default int getBaseColumnIdxByName(String colName) { - int i = 0; - for (Column col : getBaseSchema()) { - if (col.getName().equalsIgnoreCase(colName)) { - return i; - } - ++i; - } - return -1; - } - String getMysqlType(); String getEngine(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 52ac5a28bda..d9c620ee0bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -125,6 +125,8 @@ public abstract class ExternalCatalog CREATE_TIME, USE_META_CACHE); + protected static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = Runtime.getRuntime().availableProcessors(); + // Unique id of this catalog, will be assigned after catalog is loaded. @SerializedName(value = "id") protected long id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index de3eeff75d9..d0673da56fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -96,11 +96,9 @@ public class ExternalSchemaCache { } public void invalidateTableCache(String dbName, String tblName) { - SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); - schemaCache.invalidate(key); - if (LOG.isDebugEnabled()) { - LOG.debug("invalid schema cache for {}.{} in catalog {}", dbName, tblName, catalog.getName()); - } + schemaCache.asMap().keySet().stream() + .filter(key -> key.dbName.equals(dbName) && key.tblName.equals(tblName)) + .forEach(schemaCache::invalidate); } public void invalidateDbCache(String dbName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index c3351600fc6..6e852940cec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -251,7 +251,14 @@ public abstract class FileQueryScanNode extends FileScanNode { } SlotDescriptor slotDesc = desc.getSlot(slot.getSlotId()); String colName = slotDesc.getColumn().getName(); - int idx = tbl.getBaseColumnIdxByName(colName); + int idx = -1; + List<Column> columns = getColumns(); + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getName().equals(colName)) { + idx = i; + break; + } + } if (idx == -1) { throw new UserException("Column " + colName + " not found in table " + tbl.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 8d3aeaa6a26..d2c6230ba6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -193,7 +193,7 @@ public abstract class FileScanNode extends ExternalScanNode { TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); - for (Column column : tbl.getBaseSchema()) { + for (Column column : getColumns()) { Expr expr; if (column.getDefaultValue() != null) { if (column.getDefaultValueExprDef() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java index 1710646ce3d..d5316aa7fdd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSDlaTable.java @@ -76,4 +76,13 @@ public abstract class HMSDlaTable implements MTMVBaseTableIf { Env.getCurrentEnv().getRefreshManager() .refreshTable(hmsTable.getCatalog().getName(), hmsTable.getDbName(), hmsTable.getName(), true); } + + /** + * If the table is supported as related table. + * For example, an Iceberg table may become unsupported after partition revolution. + * @return + */ + protected boolean isValidRelatedTable() { + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 0ec5153a415..9b506d917f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -187,6 +187,12 @@ public class HMSExternalCatalog extends ExternalCatalog { String.valueOf(Config.hive_metastore_client_timeout_second)); } HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( + ICEBERG_CATALOG_EXECUTOR_THREAD_NUM, + Integer.MAX_VALUE, + String.format("hms_iceberg_catalog_%s_executor_pool", name), + true, + preExecutionAuthenticator); FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(), this.bindBrokerName(), this.catalogProperty.getHadoopProperties()); this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 1d520fc4178..cfd21534db0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -40,6 +40,8 @@ import org.apache.doris.datasource.hudi.HudiMvccSnapshot; import org.apache.doris.datasource.hudi.HudiSchemaCacheKey; import org.apache.doris.datasource.hudi.HudiSchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.iceberg.IcebergMvccSnapshot; +import org.apache.doris.datasource.iceberg.IcebergSchemaCacheKey; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.datasource.mvcc.EmptyMvccSnapshot; import org.apache.doris.datasource.mvcc.MvccSnapshot; @@ -211,7 +213,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } else { if (supportedIcebergTable()) { dlaType = DLAType.ICEBERG; - dlaTable = new HiveDlaTable(this); + dlaTable = new IcebergDlaTable(this); } else if (supportedHoodieTable()) { dlaType = DLAType.HUDI; dlaTable = new HudiDlaTable(this); @@ -315,6 +317,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI if (getDlaType() == DLAType.HUDI) { return ((HudiDlaTable) dlaTable).getHudiSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)) .getSchema(); + } else if (getDlaType() == DLAType.ICEBERG) { + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name); return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null); @@ -619,7 +623,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) { makeSureInitialized(); if (dlaType.equals(DLAType.ICEBERG)) { - return getIcebergSchema(); + return getIcebergSchema(key); } else if (dlaType.equals(DLAType.HUDI)) { return getHudiSchema(key); } else { @@ -627,10 +631,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } } - private Optional<SchemaCacheValue> getIcebergSchema() { - List<Column> columns = IcebergUtils.getSchema(catalog, dbName, name); - List<Column> partitionColumns = initPartitionColumns(columns); - return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns)); + private Optional<SchemaCacheValue> getIcebergSchema(SchemaCacheKey key) { + return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, ((IcebergSchemaCacheKey) key).getSchemaId()); } private Optional<SchemaCacheValue> getHudiSchema(SchemaCacheKey key) { @@ -1085,8 +1087,12 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) { if (getDlaType() == DLAType.HUDI) { return new HudiMvccSnapshot(HudiUtils.getPartitionValues(tableSnapshot, this)); + } else if (getDlaType() == DLAType.ICEBERG) { + return new IcebergMvccSnapshot( + IcebergUtils.getIcebergSnapshotCacheValue(tableSnapshot, getCatalog(), getDbName(), getName())); + } else { + return new EmptyMvccSnapshot(); } - return new EmptyMvccSnapshot(); } public boolean firstColumnIsString() { @@ -1107,4 +1113,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI getRemoteTable().getSd().getLocation(), getCatalog().getConfiguration()); } + + public boolean isValidRelatedTable() { + makeSureInitialized(); + return dlaTable.isValidRelatedTable(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java new file mode 100644 index 00000000000..36b871282a9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/IcebergDlaTable.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.iceberg.IcebergSchemaCacheValue; +import org.apache.doris.datasource.iceberg.IcebergSnapshotCacheValue; +import org.apache.doris.datasource.iceberg.IcebergUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.mtmv.MTMVRefreshContext; +import org.apache.doris.mtmv.MTMVSnapshotIdSnapshot; +import org.apache.doris.mtmv.MTMVSnapshotIf; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class IcebergDlaTable extends HMSDlaTable { + + private boolean isValidRelatedTableCached = false; + private boolean isValidRelatedTable = false; + + public IcebergDlaTable(HMSExternalTable table) { + super(table); + } + + @Override + public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()) + .getPartitionInfo().getNameToPartitionItem()); + } + + @Override + public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) { + return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; + } + + @Override + public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) { + return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet()); + } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName(), + snapshotValue.getSnapshot().getSchemaId()); + return schemaValue.getPartitionColumns(); + } + + @Override + public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, + Optional<MvccSnapshot> snapshot) throws AnalysisException { + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); + if (latestSnapshotId <= 0) { + throw new AnalysisException("can not find partition: " + partitionName); + } + return new MTMVSnapshotIdSnapshot(latestSnapshotId); + } + + @Override + public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) + throws AnalysisException { + hmsTable.makeSureInitialized(); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue( + snapshot, hmsTable.getCatalog(), hmsTable.getDbName(), hmsTable.getName()); + return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); + } + + @Override + boolean isPartitionColumnAllowNull() { + return true; + } + + @Override + protected boolean isValidRelatedTable() { + if (isValidRelatedTableCached) { + return isValidRelatedTable; + } + isValidRelatedTable = false; + Set<String> allFields = Sets.newHashSet(); + Table table = IcebergUtils.getIcebergTable( + hmsTable.getCatalog(), + hmsTable.getDbName(), + hmsTable.getName() + ); + for (PartitionSpec spec : table.specs().values()) { + if (spec == null) { + isValidRelatedTableCached = true; + return false; + } + List<PartitionField> fields = spec.fields(); + if (fields.size() != 1) { + isValidRelatedTableCached = true; + return false; + } + PartitionField partitionField = spec.fields().get(0); + String transformName = partitionField.transform().toString(); + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { + isValidRelatedTableCached = true; + return false; + } + allFields.add(table.schema().findColumnName(partitionField.sourceId())); + } + isValidRelatedTableCached = true; + isValidRelatedTable = allFields.size() == 1; + return isValidRelatedTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 9bb7ca8ae08..225e14af420 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -47,7 +47,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name"; protected String icebergCatalogType; protected Catalog catalog; - private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 16964ead1d2..2e8cdc63196 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -17,25 +17,18 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.PartitionItem; -import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.PartitionType; -import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.datasource.mvcc.MvccTable; -import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -50,32 +43,12 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Range; import com.google.common.collect.Sets; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.MetadataTableUtils; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PartitionsTable; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.StructProjection; -import java.io.IOException; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.Month; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -85,15 +58,7 @@ import java.util.stream.Collectors; public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { - public static final String YEAR = "year"; - public static final String MONTH = "month"; - public static final String DAY = "day"; - public static final String HOUR = "hour"; - public static final String IDENTITY = "identity"; - public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec - private Table table; - private List<Column> partitionColumns; private boolean isValidRelatedTableCached = false; private boolean isValidRelatedTable = false; @@ -118,29 +83,9 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa this.table = table; } - @VisibleForTesting - public void setPartitionColumns(List<Column> partitionColumns) { - this.partitionColumns = partitionColumns; - } - @Override public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) { - table = getIcebergTable(); - List<Column> schema = IcebergUtils.getSchema(catalog, dbName, name, - ((IcebergSchemaCacheKey) key).getSchemaId()); - List<Column> tmpColumns = Lists.newArrayList(); - PartitionSpec spec = table.spec(); - for (PartitionField field : spec.fields()) { - Types.NestedField col = table.schema().findField(field.sourceId()); - for (Column c : schema) { - if (c.getName().equalsIgnoreCase(col.name())) { - tmpColumns.add(c); - break; - } - } - } - partitionColumns = tmpColumns; - return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns)); + return IcebergUtils.loadSchemaCacheValue(catalog, dbName, name, ((IcebergSchemaCacheKey) key).getSchemaId()); } @Override @@ -178,23 +123,21 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } - private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() { - return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() - .getSnapshotCache(catalog, dbName, name); - } - @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { } @Override public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) { - return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem()); + return Maps.newHashMap( + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem()); } @Override public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { - return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + return IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()) + .getPartitionInfo().getNameToPartitionItem(); } @Override @@ -209,15 +152,18 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa @Override public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); - IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId()); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); + IcebergSchemaCacheValue schemaValue = IcebergUtils.getSchemaCacheValue( + catalog, getDbName(), getName(), snapshotValue.getSnapshot().getSchemaId()); return schemaValue.getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) throws AnalysisException { - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); @@ -229,7 +175,8 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) throws AnalysisException { makeSureInitialized(); - IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSnapshotCacheValue snapshotValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshot, getCatalog(), getDbName(), getName()); return new MTMVSnapshotIdSnapshot(snapshotValue.getSnapshot().getSnapshotId()); } @@ -264,10 +211,10 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa } PartitionField partitionField = spec.fields().get(0); String transformName = partitionField.transform().toString(); - if (!YEAR.equals(transformName) - && !MONTH.equals(transformName) - && !DAY.equals(transformName) - && !HOUR.equals(transformName)) { + if (!IcebergUtils.YEAR.equals(transformName) + && !IcebergUtils.MONTH.equals(transformName) + && !IcebergUtils.DAY.equals(transformName) + && !IcebergUtils.HOUR.equals(transformName)) { isValidRelatedTableCached = true; return false; } @@ -280,27 +227,13 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa @Override public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot) { - return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); - } - - public long getLatestSnapshotId() { - table = getIcebergTable(); - Snapshot snapshot = table.currentSnapshot(); - return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : table.currentSnapshot().snapshotId(); - } - - public long getSchemaId(long snapshotId) { - table = getIcebergTable(); - return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID - ? IcebergUtils.UNKNOWN_SNAPSHOT_ID - : table.snapshot(snapshotId).schemaId(); + return new IcebergMvccSnapshot(IcebergUtils.getIcebergSnapshotCacheValue( + tableSnapshot, getCatalog(), getDbName(), getName())); } @Override public List<Column> getFullSchema() { - Optional<MvccSnapshot> snapshotFromContext = MvccUtil.getSnapshotFromContext(this); - IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext); - return getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema(); + return IcebergUtils.getIcebergSchema(this, getCatalog(), getDbName(), getName()); } @Override @@ -308,239 +241,6 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa return true; } - public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) { - ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); - Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue( - new IcebergSchemaCacheKey(dbName, name, schemaId)); - if (!schemaCacheValue.isPresent()) { - throw new CacheException("failed to getSchema for: %s.%s.%s.%s", - null, catalog.getName(), dbName, name, schemaId); - } - return (IcebergSchemaCacheValue) schemaCacheValue.get(); - } - - public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException { - // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. - if (!isValidRelatedTable() || snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { - return IcebergPartitionInfo.empty(); - } - List<IcebergPartition> icebergPartitions = loadIcebergPartition(snapshotId); - Map<String, IcebergPartition> nameToPartition = Maps.newHashMap(); - Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap(); - table = getIcebergTable(); - partitionColumns = getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns(); - for (IcebergPartition partition : icebergPartitions) { - nameToPartition.put(partition.getPartitionName(), partition); - String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); - Range<PartitionKey> partitionRange = getPartitionRange( - partition.getPartitionValues().get(0), transform, partitionColumns); - PartitionItem item = new RangePartitionItem(partitionRange); - nameToPartitionItem.put(partition.getPartitionName(), item); - } - Map<String, Set<String>> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); - return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); - } - - public List<IcebergPartition> loadIcebergPartition(long snapshotId) { - PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils - .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); - List<IcebergPartition> partitions = Lists.newArrayList(); - try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { - for (FileScanTask task : tasks) { - CloseableIterable<StructLike> rows = task.asDataTask().rows(); - for (StructLike row : rows) { - partitions.add(generateIcebergPartition(row)); - } - } - } catch (IOException e) { - LOG.warn("Failed to get Iceberg table {} partition info.", name, e); - } - return partitions; - } - - public IcebergPartition generateIcebergPartition(StructLike row) { - // row format : - // 0. partitionData, - // 1. spec_id, - // 2. record_count, - // 3. file_count, - // 4. total_data_file_size_in_bytes, - // 5. position_delete_record_count, - // 6. position_delete_file_count, - // 7. equality_delete_record_count, - // 8. equality_delete_file_count, - // 9. last_updated_at, - // 10. last_updated_snapshot_id - table = getIcebergTable(); - Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); - int specId = row.get(1, Integer.class); - PartitionSpec partitionSpec = table.specs().get(specId); - StructProjection partitionData = row.get(0, StructProjection.class); - StringBuilder sb = new StringBuilder(); - List<String> partitionValues = Lists.newArrayList(); - List<String> transforms = Lists.newArrayList(); - for (int i = 0; i < partitionSpec.fields().size(); ++i) { - PartitionField partitionField = partitionSpec.fields().get(i); - Class<?> fieldClass = partitionSpec.javaClasses()[i]; - int fieldId = partitionField.fieldId(); - // Iceberg partition field id starts at PARTITION_DATA_ID_START, - // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START - int index = fieldId - PARTITION_DATA_ID_START; - Object o = partitionData.get(index, fieldClass); - String fieldValue = o == null ? null : o.toString(); - String fieldName = partitionField.name(); - sb.append(fieldName); - sb.append("="); - sb.append(fieldValue); - sb.append("/"); - partitionValues.add(fieldValue); - transforms.add(partitionField.transform().toString()); - } - if (sb.length() > 0) { - sb.delete(sb.length() - 1, sb.length()); - } - String partitionName = sb.toString(); - long recordCount = row.get(2, Long.class); - long fileCount = row.get(3, Integer.class); - long fileSizeInBytes = row.get(4, Long.class); - long lastUpdateTime = row.get(9, Long.class); - long lastUpdateSnapShotId = row.get(10, Long.class); - return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, - lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); - } - - @VisibleForTesting - public Range<PartitionKey> getPartitionRange(String value, String transform, List<Column> partitionColumns) - throws AnalysisException { - // For NULL value, create a minimum partition for it. - if (value == null) { - PartitionKey nullLowKey = PartitionKey.createPartitionKey( - Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); - PartitionKey nullUpKey = nullLowKey.successor(); - return Range.closedOpen(nullLowKey, nullUpKey); - } - LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); - LocalDateTime target; - LocalDateTime lower; - LocalDateTime upper; - long longValue = Long.parseLong(value); - switch (transform) { - case HOUR: - target = epoch.plusHours(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), - target.getHour(), 0, 0); - upper = lower.plusHours(1); - break; - case DAY: - target = epoch.plusDays(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); - upper = lower.plusDays(1); - break; - case MONTH: - target = epoch.plusMonths(longValue); - lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); - upper = lower.plusMonths(1); - break; - case YEAR: - target = epoch.plusYears(longValue); - lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); - upper = lower.plusYears(1); - break; - default: - throw new RuntimeException("Unsupported transform " + transform); - } - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - Column c = partitionColumns.get(0); - Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); - if (c.getType().isDate() || c.getType().isDateV2()) { - formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); - } - PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); - PartitionValue upperValue = new PartitionValue(upper.format(formatter)); - PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); - PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); - return Range.closedOpen(lowKey, upperKey); - } - - /** - * Merge overlapped iceberg partitions into one Doris partition. - */ - public Map<String, Set<String>> mergeOverlapPartitions(Map<String, PartitionItem> originPartitions) { - List<Map.Entry<String, PartitionItem>> entries = sortPartitionMap(originPartitions); - Map<String, Set<String>> map = Maps.newHashMap(); - for (int i = 0; i < entries.size() - 1; i++) { - Range<PartitionKey> firstValue = entries.get(i).getValue().getItems(); - String firstKey = entries.get(i).getKey(); - Range<PartitionKey> secondValue = entries.get(i + 1).getValue().getItems(); - String secondKey = entries.get(i + 1).getKey(); - // If the first entry enclose the second one, remove the second entry and keep a record in the return map. - // So we can track the iceberg partitions those contained by one Doris partition. - while (i < entries.size() && firstValue.encloses(secondValue)) { - originPartitions.remove(secondKey); - map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); - String finalSecondKey = secondKey; - map.computeIfPresent(firstKey, (key, value) -> { - value.add(finalSecondKey); - return value; - }); - i++; - if (i >= entries.size() - 1) { - break; - } - secondValue = entries.get(i + 1).getValue().getItems(); - secondKey = entries.get(i + 1).getKey(); - } - } - return map; - } - - /** - * Sort the given map entries by PartitionItem Range(LOW, HIGH) - * When comparing two ranges, the one with smaller LOW value is smaller than the other one. - * If two ranges have same values of LOW, the one with larger HIGH value is smaller. - * - * For now, we only support year, month, day and hour, - * so it is impossible to have two partially intersect partitions. - * One range is either enclosed by another or has no intersection at all with another. - * - * - * For example, we have these 4 ranges: - * [10, 20), [30, 40), [0, 30), [10, 15) - * - * After sort, they become: - * [0, 30), [10, 20), [10, 15), [30, 40) - */ - public List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String, PartitionItem> originPartitions) { - List<Map.Entry<String, PartitionItem>> entries = new ArrayList<>(originPartitions.entrySet()); - entries.sort(new RangeComparator()); - return entries; - } - - public static class RangeComparator implements Comparator<Map.Entry<String, PartitionItem>> { - @Override - public int compare(Map.Entry<String, PartitionItem> p1, Map.Entry<String, PartitionItem> p2) { - PartitionItem value1 = p1.getValue(); - PartitionItem value2 = p2.getValue(); - if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { - Range<PartitionKey> items1 = value1.getItems(); - Range<PartitionKey> items2 = value2.getItems(); - if (!items1.hasLowerBound()) { - return -1; - } - if (!items2.hasLowerBound()) { - return 1; - } - PartitionKey upper1 = items1.upperEndpoint(); - PartitionKey lower1 = items1.lowerEndpoint(); - PartitionKey upper2 = items2.upperEndpoint(); - PartitionKey lower2 = items2.lowerEndpoint(); - int compareLow = lower1.compareTo(lower2); - return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; - } - return 0; - } - } - @VisibleForTesting public boolean isValidRelatedTableCached() { return isValidRelatedTableCached; @@ -554,12 +254,4 @@ public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTa public void setIsValidRelatedTableCached(boolean isCached) { this.isValidRelatedTableCached = isCached; } - - public IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) { - if (snapshot.isPresent()) { - return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); - } else { - return getIcebergSnapshotCacheValue(); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index f3daf2d2795..57c5eb20c64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -26,6 +26,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalMetaCacheMgr; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.thrift.TIcebergMetadataParams; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -136,12 +137,18 @@ public class IcebergMetadataCache { @NotNull private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { - IcebergExternalTable table = (IcebergExternalTable) key.catalog.getDbOrAnalysisException(key.dbName) + MTMVRelatedTableIf table = (MTMVRelatedTableIf) key.catalog.getDbOrAnalysisException(key.dbName) .getTableOrAnalysisException(key.tableName); - long snapshotId = table.getLatestSnapshotId(); - long schemaId = table.getSchemaId(snapshotId); - IcebergPartitionInfo icebergPartitionInfo = table.loadPartitionInfo(snapshotId); - return new IcebergSnapshotCacheValue(icebergPartitionInfo, new IcebergSnapshot(snapshotId, schemaId)); + IcebergSnapshot lastedIcebergSnapshot = IcebergUtils.getLastedIcebergSnapshot( + (ExternalCatalog) key.catalog, key.dbName, key.tableName); + IcebergPartitionInfo icebergPartitionInfo; + if (!table.isValidRelatedTable()) { + icebergPartitionInfo = IcebergPartitionInfo.empty(); + } else { + icebergPartitionInfo = IcebergUtils.loadPartitionInfo( + (ExternalCatalog) key.catalog, key.dbName, key.tableName, lastedIcebergSnapshot.getSnapshotId()); + } + return new IcebergSnapshotCacheValue(icebergPartitionInfo, lastedIcebergSnapshot); } public void invalidateCatalogCache(long catalogId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index a3cbfec688e..f1d6c4eb033 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -31,37 +31,58 @@ import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionDesc; +import org.apache.doris.analysis.PartitionValue; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionsTable; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.And; @@ -77,13 +98,25 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.StructProjection; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; +import java.time.DateTimeException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.Month; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -114,7 +147,15 @@ public class IcebergUtils { // nickname in spark public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; - public static final long UNKNOWN_SNAPSHOT_ID = -1; + public static final long UNKNOWN_SNAPSHOT_ID = -1; // means an empty table + public static final long NEWEST_SCHEMA_ID = -1; + + public static final String YEAR = "year"; + public static final String MONTH = "month"; + public static final String DAY = "day"; + public static final String HOUR = "hour"; + public static final String IDENTITY = "identity"; + public static final int PARTITION_DATA_ID_START = 1000; // org.apache.iceberg.PartitionSpec public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { @@ -579,10 +620,6 @@ public class IcebergUtils { : metadataCache.getIcebergTable(catalog, dbName, tblName); } - public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name) { - return getSchema(catalog, dbName, name, UNKNOWN_SNAPSHOT_ID); - } - /** * Get iceberg schema from catalog and convert them to doris schema */ @@ -591,7 +628,7 @@ public class IcebergUtils { return catalog.getPreExecutionAuthenticator().execute(() -> { org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); Schema schema; - if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) { + if (schemaId == NEWEST_SCHEMA_ID || icebergTable.currentSnapshot() == null) { schema = icebergTable.schema(); } else { schema = icebergTable.schemas().get((int) schemaId); @@ -744,4 +781,330 @@ public class IcebergUtils { return matchingManifests; } + + // get snapshot id from query like 'for version/time as of' + public static long getQuerySpecSnapshot(Table table, TableSnapshot queryTableSnapshot) { + TableSnapshot.VersionType type = queryTableSnapshot.getType(); + if (type == TableSnapshot.VersionType.VERSION) { + return queryTableSnapshot.getVersion(); + } else { + long timestamp = TimeUtils.timeStringToLong(queryTableSnapshot.getTime(), TimeUtils.getTimeZone()); + if (timestamp < 0) { + throw new DateTimeException("can't parse time: " + queryTableSnapshot.getTime()); + } + return SnapshotUtil.snapshotIdAsOfTime(table, timestamp); + } + } + + // read schema from external schema cache + public static IcebergSchemaCacheValue getSchemaCacheValue( + ExternalCatalog catalog, String dbName, String name, long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue( + new IcebergSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); + } + return (IcebergSchemaCacheValue) schemaCacheValue.get(); + } + + public static IcebergSnapshot getLastedIcebergSnapshot(ExternalCatalog catalog, String dbName, String tbName) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = table.currentSnapshot(); + long snapshotId = snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : snapshot.snapshotId(); + return new IcebergSnapshot(snapshotId, table.schema().schemaId()); + } + + public static IcebergPartitionInfo loadPartitionInfo( + ExternalCatalog catalog, String dbName, String tbName, long snapshotId) throws AnalysisException { + // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. + if (snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { + return IcebergPartitionInfo.empty(); + } + Table table = getIcebergTable(catalog, dbName, tbName); + List<IcebergPartition> icebergPartitions = loadIcebergPartition(table, snapshotId); + Map<String, IcebergPartition> nameToPartition = Maps.newHashMap(); + Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap(); + + List<Column> partitionColumns = IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, table.snapshot(snapshotId).schemaId()).getPartitionColumns(); + for (IcebergPartition partition : icebergPartitions) { + nameToPartition.put(partition.getPartitionName(), partition); + String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); + Range<PartitionKey> partitionRange = getPartitionRange( + partition.getPartitionValues().get(0), transform, partitionColumns); + PartitionItem item = new RangePartitionItem(partitionRange); + nameToPartitionItem.put(partition.getPartitionName(), item); + } + Map<String, Set<String>> partitionNameMap = mergeOverlapPartitions(nameToPartitionItem); + return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); + } + + private static List<IcebergPartition> loadIcebergPartition(Table table, long snapshotId) { + PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils + .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); + List<IcebergPartition> partitions = Lists.newArrayList(); + try (CloseableIterable<FileScanTask> tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { + for (FileScanTask task : tasks) { + CloseableIterable<StructLike> rows = task.asDataTask().rows(); + for (StructLike row : rows) { + partitions.add(generateIcebergPartition(table, row)); + } + } + } catch (IOException e) { + LOG.warn("Failed to get Iceberg table {} partition info.", table.name(), e); + } + return partitions; + } + + private static IcebergPartition generateIcebergPartition(Table table, StructLike row) { + // row format : + // 0. partitionData, + // 1. spec_id, + // 2. record_count, + // 3. file_count, + // 4. total_data_file_size_in_bytes, + // 5. position_delete_record_count, + // 6. position_delete_file_count, + // 7. equality_delete_record_count, + // 8. equality_delete_file_count, + // 9. last_updated_at, + // 10. last_updated_snapshot_id + Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); + int specId = row.get(1, Integer.class); + PartitionSpec partitionSpec = table.specs().get(specId); + StructProjection partitionData = row.get(0, StructProjection.class); + StringBuilder sb = new StringBuilder(); + List<String> partitionValues = Lists.newArrayList(); + List<String> transforms = Lists.newArrayList(); + for (int i = 0; i < partitionSpec.fields().size(); ++i) { + PartitionField partitionField = partitionSpec.fields().get(i); + Class<?> fieldClass = partitionSpec.javaClasses()[i]; + int fieldId = partitionField.fieldId(); + // Iceberg partition field id starts at PARTITION_DATA_ID_START, + // So we can get the field index in partitionData using fieldId - PARTITION_DATA_ID_START + int index = fieldId - PARTITION_DATA_ID_START; + Object o = partitionData.get(index, fieldClass); + String fieldValue = o == null ? null : o.toString(); + String fieldName = partitionField.name(); + sb.append(fieldName); + sb.append("="); + sb.append(fieldValue); + sb.append("/"); + partitionValues.add(fieldValue); + transforms.add(partitionField.transform().toString()); + } + if (sb.length() > 0) { + sb.delete(sb.length() - 1, sb.length()); + } + String partitionName = sb.toString(); + long recordCount = row.get(2, Long.class); + long fileCount = row.get(3, Integer.class); + long fileSizeInBytes = row.get(4, Long.class); + long lastUpdateTime = row.get(9, Long.class); + long lastUpdateSnapShotId = row.get(10, Long.class); + return new IcebergPartition(partitionName, specId, recordCount, fileSizeInBytes, fileCount, + lastUpdateTime, lastUpdateSnapShotId, partitionValues, transforms); + } + + @VisibleForTesting + public static Range<PartitionKey> getPartitionRange(String value, String transform, List<Column> partitionColumns) + throws AnalysisException { + // For NULL value, create a minimum partition for it. + if (value == null) { + PartitionKey nullLowKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); + PartitionKey nullUpKey = nullLowKey.successor(); + return Range.closedOpen(nullLowKey, nullUpKey); + } + LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); + LocalDateTime target; + LocalDateTime lower; + LocalDateTime upper; + long longValue = Long.parseLong(value); + switch (transform) { + case HOUR: + target = epoch.plusHours(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), + target.getHour(), 0, 0); + upper = lower.plusHours(1); + break; + case DAY: + target = epoch.plusDays(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), target.getDayOfMonth(), 0, 0, 0); + upper = lower.plusDays(1); + break; + case MONTH: + target = epoch.plusMonths(longValue); + lower = LocalDateTime.of(target.getYear(), target.getMonth(), 1, 0, 0, 0); + upper = lower.plusMonths(1); + break; + case YEAR: + target = epoch.plusYears(longValue); + lower = LocalDateTime.of(target.getYear(), Month.JANUARY, 1, 0, 0, 0); + upper = lower.plusYears(1); + break; + default: + throw new RuntimeException("Unsupported transform " + transform); + } + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + Column c = partitionColumns.get(0); + Preconditions.checkState(c.getDataType().isDateType(), "Only support date type partition column"); + if (c.getType().isDate() || c.getType().isDateV2()) { + formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + } + PartitionValue lowerValue = new PartitionValue(lower.format(formatter)); + PartitionValue upperValue = new PartitionValue(upper.format(formatter)); + PartitionKey lowKey = PartitionKey.createPartitionKey(Lists.newArrayList(lowerValue), partitionColumns); + PartitionKey upperKey = PartitionKey.createPartitionKey(Lists.newArrayList(upperValue), partitionColumns); + return Range.closedOpen(lowKey, upperKey); + } + + /** + * Merge overlapped iceberg partitions into one Doris partition. + */ + @VisibleForTesting + public static Map<String, Set<String>> mergeOverlapPartitions(Map<String, PartitionItem> originPartitions) { + List<Map.Entry<String, PartitionItem>> entries = sortPartitionMap(originPartitions); + Map<String, Set<String>> map = Maps.newHashMap(); + for (int i = 0; i < entries.size() - 1; i++) { + Range<PartitionKey> firstValue = entries.get(i).getValue().getItems(); + String firstKey = entries.get(i).getKey(); + Range<PartitionKey> secondValue = entries.get(i + 1).getValue().getItems(); + String secondKey = entries.get(i + 1).getKey(); + // If the first entry enclose the second one, remove the second entry and keep a record in the return map. + // So we can track the iceberg partitions those contained by one Doris partition. + while (i < entries.size() && firstValue.encloses(secondValue)) { + originPartitions.remove(secondKey); + map.putIfAbsent(firstKey, Sets.newHashSet(firstKey)); + String finalSecondKey = secondKey; + map.computeIfPresent(firstKey, (key, value) -> { + value.add(finalSecondKey); + return value; + }); + i++; + if (i >= entries.size() - 1) { + break; + } + secondValue = entries.get(i + 1).getValue().getItems(); + secondKey = entries.get(i + 1).getKey(); + } + } + return map; + } + + /** + * Sort the given map entries by PartitionItem Range(LOW, HIGH) + * When comparing two ranges, the one with smaller LOW value is smaller than the other one. + * If two ranges have same values of LOW, the one with larger HIGH value is smaller. + * + * For now, we only support year, month, day and hour, + * so it is impossible to have two partially intersect partitions. + * One range is either enclosed by another or has no intersection at all with another. + * + * + * For example, we have these 4 ranges: + * [10, 20), [30, 40), [0, 30), [10, 15) + * + * After sort, they become: + * [0, 30), [10, 20), [10, 15), [30, 40) + */ + @VisibleForTesting + public static List<Map.Entry<String, PartitionItem>> sortPartitionMap(Map<String, PartitionItem> originPartitions) { + List<Map.Entry<String, PartitionItem>> entries = new ArrayList<>(originPartitions.entrySet()); + entries.sort(new RangeComparator()); + return entries; + } + + public static class RangeComparator implements Comparator<Map.Entry<String, PartitionItem>> { + @Override + public int compare(Map.Entry<String, PartitionItem> p1, Map.Entry<String, PartitionItem> p2) { + PartitionItem value1 = p1.getValue(); + PartitionItem value2 = p2.getValue(); + if (value1 instanceof RangePartitionItem && value2 instanceof RangePartitionItem) { + Range<PartitionKey> items1 = value1.getItems(); + Range<PartitionKey> items2 = value2.getItems(); + if (!items1.hasLowerBound()) { + return -1; + } + if (!items2.hasLowerBound()) { + return 1; + } + PartitionKey upper1 = items1.upperEndpoint(); + PartitionKey lower1 = items1.lowerEndpoint(); + PartitionKey upper2 = items2.upperEndpoint(); + PartitionKey lower2 = items2.lowerEndpoint(); + int compareLow = lower1.compareTo(lower2); + return compareLow == 0 ? upper2.compareTo(upper1) : compareLow; + } + return 0; + } + } + + public static IcebergSnapshotCacheValue getIcebergSnapshotCacheValue( + Optional<TableSnapshot> tableSnapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + IcebergSnapshotCacheValue snapshotCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getSnapshotCache(catalog, dbName, tbName); + if (tableSnapshot.isPresent()) { + // If a snapshot is specified, + // use the specified snapshot and the corresponding schema(not the latest schema). + Table icebergTable = getIcebergTable(catalog, dbName, tbName); + TableSnapshot snapshot = tableSnapshot.get(); + long querySpecSnapshot = getQuerySpecSnapshot(icebergTable, snapshot); + return new IcebergSnapshotCacheValue( + IcebergPartitionInfo.empty(), + new IcebergSnapshot(querySpecSnapshot, icebergTable.snapshot(querySpecSnapshot).schemaId())); + } else { + // Otherwise, use the latest snapshot and the latest schema. + return snapshotCache; + } + } + + // load table schema from iceberg API to external schema cache. + public static Optional<SchemaCacheValue> loadSchemaCacheValue( + ExternalCatalog catalog, String dbName, String tbName, long schemaId) { + Table table = IcebergUtils.getIcebergTable(catalog, dbName, tbName); + List<Column> schema = IcebergUtils.getSchema(catalog, dbName, tbName, schemaId); + List<Column> tmpColumns = Lists.newArrayList(); + PartitionSpec spec = table.spec(); + for (PartitionField field : spec.fields()) { + Types.NestedField col = table.schema().findField(field.sourceId()); + for (Column c : schema) { + if (c.getName().equalsIgnoreCase(col.name())) { + tmpColumns.add(c); + break; + } + } + } + return Optional.of(new IcebergSchemaCacheValue(schema, tmpColumns)); + } + + public static List<Column> getIcebergSchema( + TableIf tableIf, + ExternalCatalog catalog, + String dbName, + String tbName) { + Optional<MvccSnapshot> snapshotFromContext = MvccUtil.getSnapshotFromContext(tableIf); + IcebergSnapshotCacheValue cacheValue = + IcebergUtils.getOrFetchSnapshotCacheValue(snapshotFromContext, catalog, dbName, tbName); + return IcebergUtils.getSchemaCacheValue( + catalog, dbName, tbName, cacheValue.getSnapshot().getSchemaId()) + .getSchema(); + } + + public static IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue( + Optional<MvccSnapshot> snapshot, + ExternalCatalog catalog, + String dbName, + String tbName) { + if (snapshot.isPresent()) { + return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return IcebergUtils.getIcebergSnapshotCacheValue(Optional.empty(), catalog, dbName, tbName); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 30a2380a272..a634aa6c91f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -27,7 +27,6 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; @@ -66,13 +65,11 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Conversions; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.time.DateTimeException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -392,16 +389,7 @@ public class IcebergScanNode extends FileQueryScanNode { public Long getSpecifiedSnapshot() { TableSnapshot tableSnapshot = getQueryTableSnapshot(); if (tableSnapshot != null) { - TableSnapshot.VersionType type = tableSnapshot.getType(); - if (type == TableSnapshot.VersionType.VERSION) { - return tableSnapshot.getVersion(); - } else { - long timestamp = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); - if (timestamp < 0) { - throw new DateTimeException("can't parse time: " + tableSnapshot.getTime()); - } - return SnapshotUtil.snapshotIdAsOfTime(icebergTable, timestamp); - } + return IcebergUtils.getQuerySpecSnapshot(icebergTable, tableSnapshot); } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 37488a001f6..1dde5f633ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -110,6 +110,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList(); protected TableSnapshot tableSnapshot; + protected List<Column> columns; // Save the id of backends which this scan node will be executed on. // This is also important for local shuffle logic. @@ -141,6 +142,13 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { return result; } + protected List<Column> getColumns() { + if (columns == null && desc.getTable() != null) { + columns = desc.getTable().getBaseSchema(); + } + return columns; + } + public TupleDescriptor getTupleDesc() { return desc; } @@ -233,7 +241,7 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { // for load scan node, table is null // partitionsInfo maybe null for other scan node, eg: ExternalScanNode... if (desc.getTable() != null) { - computeColumnsFilter(desc.getTable().getBaseSchema(), partitionsInfo); + computeColumnsFilter(getColumns(), partitionsInfo); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java index 49144d874a4..f7305e6eb7c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -171,42 +171,39 @@ public class IcebergExternalTableTest { @Test public void testGetPartitionRange() throws AnalysisException { - IcebergExternalDatabase database = new IcebergExternalDatabase(null, 1L, "2", "2"); - IcebergExternalTable table = new IcebergExternalTable(1, "1", "1", null, database); Column c = new Column("ts", PrimitiveType.DATETIMEV2); List<Column> partitionColumns = Lists.newArrayList(c); - table.setPartitionColumns(partitionColumns); // Test null partition value - Range<PartitionKey> nullRange = table.getPartitionRange(null, "hour", partitionColumns); + Range<PartitionKey> nullRange = IcebergUtils.getPartitionRange(null, "hour", partitionColumns); Assertions.assertEquals("0000-01-01 00:00:00", nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("0000-01-01 00:00:01", nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); // Test hour transform. - Range<PartitionKey> hour = table.getPartitionRange("100", "hour", partitionColumns); + Range<PartitionKey> hour = IcebergUtils.getPartitionRange("100", "hour", partitionColumns); PartitionKey lowKey = hour.lowerEndpoint(); PartitionKey upKey = hour.upperEndpoint(); Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test day transform. - Range<PartitionKey> day = table.getPartitionRange("100", "day", partitionColumns); + Range<PartitionKey> day = IcebergUtils.getPartitionRange("100", "day", partitionColumns); lowKey = day.lowerEndpoint(); upKey = day.upperEndpoint(); Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test month transform. - Range<PartitionKey> month = table.getPartitionRange("100", "month", partitionColumns); + Range<PartitionKey> month = IcebergUtils.getPartitionRange("100", "month", partitionColumns); lowKey = month.lowerEndpoint(); upKey = month.upperEndpoint(); Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test year transform. - Range<PartitionKey> year = table.getPartitionRange("100", "year", partitionColumns); + Range<PartitionKey> year = IcebergUtils.getPartitionRange("100", "year", partitionColumns); lowKey = year.lowerEndpoint(); upKey = year.upperEndpoint(); Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); @@ -214,26 +211,23 @@ public class IcebergExternalTableTest { // Test unsupported transform Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { - table.getPartitionRange("100", "bucket", partitionColumns); + IcebergUtils.getPartitionRange("100", "bucket", partitionColumns); }); Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); } @Test public void testSortRange() throws AnalysisException { - IcebergExternalDatabase database = new IcebergExternalDatabase(null, 1L, "2", "2"); - IcebergExternalTable table = new IcebergExternalTable(1, "1", "1", null, database); Column c = new Column("c", PrimitiveType.DATETIMEV2); ArrayList<Column> columns = Lists.newArrayList(c); - table.setPartitionColumns(Lists.newArrayList(c)); - PartitionItem nullRange = new RangePartitionItem(table.getPartitionRange(null, "hour", columns)); - PartitionItem year1970 = new RangePartitionItem(table.getPartitionRange("0", "year", columns)); - PartitionItem year1971 = new RangePartitionItem(table.getPartitionRange("1", "year", columns)); - PartitionItem month197002 = new RangePartitionItem(table.getPartitionRange("1", "month", columns)); - PartitionItem month197103 = new RangePartitionItem(table.getPartitionRange("14", "month", columns)); - PartitionItem month197204 = new RangePartitionItem(table.getPartitionRange("27", "month", columns)); - PartitionItem day19700202 = new RangePartitionItem(table.getPartitionRange("32", "day", columns)); - PartitionItem day19730101 = new RangePartitionItem(table.getPartitionRange("1096", "day", columns)); + PartitionItem nullRange = new RangePartitionItem(IcebergUtils.getPartitionRange(null, "hour", columns)); + PartitionItem year1970 = new RangePartitionItem(IcebergUtils.getPartitionRange("0", "year", columns)); + PartitionItem year1971 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "year", columns)); + PartitionItem month197002 = new RangePartitionItem(IcebergUtils.getPartitionRange("1", "month", columns)); + PartitionItem month197103 = new RangePartitionItem(IcebergUtils.getPartitionRange("14", "month", columns)); + PartitionItem month197204 = new RangePartitionItem(IcebergUtils.getPartitionRange("27", "month", columns)); + PartitionItem day19700202 = new RangePartitionItem(IcebergUtils.getPartitionRange("32", "day", columns)); + PartitionItem day19730101 = new RangePartitionItem(IcebergUtils.getPartitionRange("1096", "day", columns)); Map<String, PartitionItem> map = Maps.newHashMap(); map.put("nullRange", nullRange); map.put("year1970", year1970); @@ -243,7 +237,7 @@ public class IcebergExternalTableTest { map.put("month197204", month197204); map.put("day19700202", day19700202); map.put("day19730101", day19730101); - List<Map.Entry<String, PartitionItem>> entries = table.sortPartitionMap(map); + List<Map.Entry<String, PartitionItem>> entries = IcebergUtils.sortPartitionMap(map); Assertions.assertEquals(8, entries.size()); Assertions.assertEquals("nullRange", entries.get(0).getKey()); Assertions.assertEquals("year1970", entries.get(1).getKey()); @@ -254,7 +248,7 @@ public class IcebergExternalTableTest { Assertions.assertEquals("month197204", entries.get(6).getKey()); Assertions.assertEquals("day19730101", entries.get(7).getKey()); - Map<String, Set<String>> stringSetMap = table.mergeOverlapPartitions(map); + Map<String, Set<String>> stringSetMap = IcebergUtils.mergeOverlapPartitions(map); Assertions.assertEquals(2, stringSetMap.size()); Assertions.assertTrue(stringSetMap.containsKey("year1970")); Assertions.assertTrue(stringSetMap.containsKey("year1971")); diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out new file mode 100644 index 00000000000..27e9359b713 Binary files /dev/null and b/regression-test/data/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.out differ diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy new file mode 100644 index 00000000000..a376dfc210d --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_schema_change_with_timetravel.groovy @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + + +suite("iceberg_schema_change_with_timetravel", "p0,external,doris,external_docker,external_docker_doris") { + + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "iceberg_schema_change_with_timetravel" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """ use test_db;""" + + def executeTimeTravelingQueries = { String tableName -> + def snapshots = sql """ select snapshot_id from iceberg_meta("table" = "${catalog_name}.test_db.${tableName}", "query_type" = "snapshots") order by committed_at; """ + def snapshotIds = [ + s0: snapshots.get(0)[0], + s1: snapshots.get(1)[0], + s2: snapshots.get(2)[0], + s3: snapshots.get(3)[0], + s4: snapshots.get(4)[0] + ] + + qt_q0 """ desc ${tableName} """ + qt_q1 """ select * from ${tableName} order by c1 """ + qt_q2 """ select * from ${tableName} for version as of ${snapshotIds.s0} order by c1 """ + qt_q3 """ select * from ${tableName} for version as of ${snapshotIds.s1} order by c1 """ + qt_q4 """ select * from ${tableName} for version as of ${snapshotIds.s2} order by c1 """ + qt_q5 """ select * from ${tableName} for version as of ${snapshotIds.s3} order by c1 """ + qt_q6 """ select * from ${tableName} for version as of ${snapshotIds.s4} order by c1 """ + } + + executeTimeTravelingQueries("schema_change_with_time_travel") + executeTimeTravelingQueries("schema_change_with_time_travel_orc") + +} + +/* +create table schema_change_with_time_travel (c1 int); +insert into schema_change_with_time_travel values (1); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (2,3); +alter table schema_change_with_time_travel add column c3 int; +insert into schema_change_with_time_travel values (4,5,6); +alter table schema_change_with_time_travel drop column c2; +insert into schema_change_with_time_travel values (7,8); +alter table schema_change_with_time_travel add column c2 int; +insert into schema_change_with_time_travel values (9,10,11); +alter table schema_change_with_time_travel add column c4 int; +*/ + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org