This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f4e5727c076 branch-3.0: [feat](mtmv)mtmv support paimon partition refresh #43959 #44911 (#45660) f4e5727c076 is described below commit f4e5727c0767c779a9dda0b45877de205c78d5eb Author: zhangdong <zhangd...@selectdb.com> AuthorDate: Tue Dec 24 19:26:18 2024 +0800 branch-3.0: [feat](mtmv)mtmv support paimon partition refresh #43959 #44911 (#45660) pick: #44911 #43959 only pick code about paimon, not pick some code about MTMV REFRESH --- .../apache/doris/datasource/ExternalCatalog.java | 9 +- .../doris/datasource/ExternalMetaCacheMgr.java | 12 + .../doris/datasource/ExternalSchemaCache.java | 6 +- .../org/apache/doris/datasource/ExternalTable.java | 9 +- .../doris/datasource/hive/HMSExternalTable.java | 7 +- .../maxcompute/MaxComputeExternalTable.java | 2 +- .../MvccUtil.java} | 45 ++-- .../datasource/paimon/PaimonExternalTable.java | 228 +++++++++-------- .../datasource/paimon/PaimonMetadataCache.java | 144 +++++++++++ .../datasource/paimon/PaimonMetadataCacheMgr.java | 49 ++++ ...hemaCacheValue.java => PaimonMvccSnapshot.java} | 21 +- .../doris/datasource/paimon/PaimonPartition.java | 61 +++++ ...emaCacheValue.java => PaimonPartitionInfo.java} | 31 ++- ...imonSchemaCacheValue.java => PaimonSchema.java} | 29 ++- .../datasource/paimon/PaimonSchemaCacheKey.java | 55 +++++ .../datasource/paimon/PaimonSchemaCacheValue.java | 12 +- ...onSchemaCacheValue.java => PaimonSnapshot.java} | 25 +- .../datasource/paimon/PaimonSnapshotCacheKey.java | 75 ++++++ ...cheValue.java => PaimonSnapshotCacheValue.java} | 24 +- .../apache/doris/datasource/paimon/PaimonUtil.java | 275 +++++++++++++++++++++ .../datasource/paimon/source/PaimonSource.java | 3 +- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 14 ++ .../org/apache/doris/nereids/StatementContext.java | 26 +- .../rules/rewrite/PruneFileScanPartition.java | 9 +- .../plans/commands/UpdateMvByPartitionCommand.java | 7 + .../trees/plans/logical/LogicalFileScan.java | 7 +- .../java/org/apache/doris/mtmv/PaimonUtilTest.java | 71 ++++++ regression-test/data/mtmv_p0/test_paimon_mtmv.out | 9 - .../suites/mtmv_p0/test_paimon_mtmv.groovy | 62 ----- 29 files changed, 1038 insertions(+), 289 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index c8ca21e88ef..d1df51177fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; @@ -453,13 +454,13 @@ public abstract class ExternalCatalog } } - public final Optional<SchemaCacheValue> getSchema(String dbName, String tblName) { + public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) { makeSureInitialized(); - Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName); + Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(key.getDbName()); if (db.isPresent()) { - Optional<? extends ExternalTable> table = db.get().getTable(tblName); + Optional<? extends ExternalTable> table = db.get().getTable(key.getTblName()); if (table.isPresent()) { - return table.get().initSchemaAndUpdateTime(); + return table.get().initSchemaAndUpdateTime(key); } } return Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index cc40ad292ce..24f55e74266 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr { private ExternalRowCountCache rowCountCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; + private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; public ExternalMetaCacheMgr() { rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -122,6 +125,7 @@ public class ExternalMetaCacheMgr { hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); + paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } public ExecutorService getFileListingExecutor() { @@ -167,6 +171,10 @@ public class ExternalMetaCacheMgr { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCacheMgr.getPaimonMetadataCache(); + } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); } @@ -189,6 +197,7 @@ public class ExternalMetaCacheMgr { hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); + paimonMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -204,6 +213,7 @@ public class ExternalMetaCacheMgr { hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -222,6 +232,7 @@ public class ExternalMetaCacheMgr { hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -239,6 +250,7 @@ public class ExternalMetaCacheMgr { hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); + paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e81..de3eeff75d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -74,7 +74,7 @@ public class ExternalSchemaCache { } private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) { - Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName, key.tblName); + Optional<SchemaCacheValue> schema = catalog.getSchema(key); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } @@ -83,6 +83,10 @@ public class ExternalSchemaCache { public Optional<SchemaCacheValue> getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + return getSchemaValue(key); + } + + public Optional<SchemaCacheValue> getSchemaValue(SchemaCacheKey key) { return schemaCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index bd1e36e7bc9..91df061678f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -317,8 +318,12 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * * @return */ - public Optional<SchemaCacheValue> initSchemaAndUpdateTime() { + public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) { schemaUpdateTime = System.currentTimeMillis(); + return initSchema(key); + } + + public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) { return initSchema(); } @@ -399,7 +404,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index ad685386ec9..da4670d6d05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; @@ -330,7 +331,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { return getNameToPartitionItems(); } @@ -501,6 +502,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override + public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) { + return initSchemaAndUpdateTime(); + } + public Optional<SchemaCacheValue> initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 0f748f59e92..dbbbcf2d6a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -92,7 +92,7 @@ public class MaxComputeExternalTable extends ExternalTable { } @Override - protected Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { if (getPartitionColumns().isEmpty()) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java similarity index 50% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java index aaaefe7f32d..ffdaff770e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java @@ -15,25 +15,30 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource.paimon; - -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; - -import org.apache.paimon.table.Table; - -import java.util.List; - -public class PaimonSchemaCacheValue extends SchemaCacheValue { - - private Table paimonTable; - - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; - } - - public Table getPaimonTable() { - return paimonTable; +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.Optional; + +public class MvccUtil { + /** + * get Snapshot From StatementContext + * + * @param tableIf + * @return MvccSnapshot + */ + public static Optional<MvccSnapshot> getSnapshotFromContext(TableIf tableIf) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return Optional.empty(); + } + StatementContext statementContext = connectContext.getStatementContext(); + if (statementContext == null) { + return Optional.empty(); + } + return statementContext.getSnapshot(tableIf); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df3..7b59d879d93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -18,10 +18,16 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -30,30 +36,36 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.types.ArrayType; +import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.RowType; -import java.util.ArrayList; +import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.Set; -public class PaimonExternalTable extends ExternalTable { +public class PaimonExternalTable extends ExternalTable implements MvccTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + private final Table paimonTable; + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + this.paimonTable = catalog.getPaimonTable(dbName, name); } public String getPaimonCatalogType() { @@ -67,99 +79,27 @@ public class PaimonExternalTable extends ExternalTable { } } - public Table getPaimonTable() { - makeSureInitialized(); - Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); + public Table getPaimonTable(Optional<MvccSnapshot> snapshot) { + return paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), + String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - @Override - public Optional<SchemaCacheValue> initSchema() { - Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - TableSchema schema = ((FileStoreTable) paimonTable).schema(); - List<DataField> columns = schema.fields(); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (DataField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(), - paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id())); - } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); - } - - private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { - int tsScale = 3; // default - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case BIGINT: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case SMALLINT: - return Type.SMALLINT; - case TINYINT: - return Type.TINYINT; - case VARCHAR: - case BINARY: - case CHAR: - case VARBINARY: - return Type.STRING; - case DECIMAL: - DecimalType decimal = (DecimalType) dataType; - return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.TimestampType) { - tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); - return org.apache.doris.catalog.ArrayType.create(innerType, true); - case MAP: - MapType mapType = (MapType) dataType; - return new org.apache.doris.catalog.MapType( - paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); - case ROW: - RowType rowType = (RowType) dataType; - List<DataField> fields = rowType.getFields(); - return new org.apache.doris.catalog.StructType(fields.stream() - .map(field -> new org.apache.doris.catalog.StructField(field.name(), - paimonTypeToDorisType(field.type()))) - .collect(Collectors.toCollection(ArrayList::new))); - case TIME_WITHOUT_TIME_ZONE: - return Type.UNSUPPORTED; - default: - LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); - return Type.UNSUPPORTED; + public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue( + new PaimonSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); } + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } - protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { - return paimonPrimitiveTypeToDorisType(type); + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { + makeSureInitialized(); + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -189,13 +129,6 @@ public class PaimonExternalTable extends ExternalTable { public long fetchRowCount() { makeSureInitialized(); long rowCount = 0; - Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - LOG.info("Paimon table {} is null.", name); - return UNKNOWN_ROW_COUNT; - } List<Split> splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); @@ -205,4 +138,87 @@ public class PaimonExternalTable extends ExternalTable { } return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } + + @Override + public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) { + return getPaimonSchemaCacheValue(snapshot).getPartitionColumns(); + } + + @Override + public MvccSnapshot loadSnapshot() { + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); + } + + @Override + public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) { + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + + @Override + public List<Column> getFullSchema() { + return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); + } + + @Override + public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) { + makeSureInitialized(); + PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); + List<DataField> columns = schema.getFields(); + List<Column> dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set<String> partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List<Column> partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return Optional.of(new PaimonSchemaCacheValue(dorisColumns, partitionColumns)); + } catch (Exception e) { + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), + paimonSchemaCacheKey.getSchemaId()); + } + } + + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), + name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data + List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } + } + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); + } + + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) { + PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); + return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); + } + + private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java new file mode 100644 index 00000000000..5b711e07066 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCache { + + private final LoadingCache<PaimonSnapshotCacheKey, PaimonSnapshotCacheValue> snapshotCache; + + public PaimonMetadataCache(ExecutorService executor) { + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalTable table = (PaimonExternalTable) key.getCatalog().getDbOrAnalysisException(key.getDbName()) + .getTableOrAnalysisException(key.getTableName()); + List<Column> partitionColumns = table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId()) + .getPartitionColumns(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + } catch (IOException | AnalysisException e) { + throw new CacheException("failed to loadSnapshot for: %s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName()); + } + } + + private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List<Column> partitionColumns) + throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List<PaimonPartition> paimonPartitions = loadPartitions(key); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key) + throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List<InternalRow> rows = PaimonUtil.read(table, null, null); + List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; + } + + private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId and schemaId + List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); + long latestSnapshotId = 0L; + long latestSchemaId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + latestSchemaId = row.getLong(1); + } + } + return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + } + + public void invalidateCatalogCache(long catalogId) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(snapshotCache::invalidate); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(snapshotCache::invalidate); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(snapshotCache::invalidate); + } + + public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return snapshotCache.get(key); + } + + public Map<String, Map<String, String>> getCacheStats() { + Map<String, Map<String, String>> res = Maps.newHashMap(); + res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java new file mode 100644 index 00000000000..a282fde665b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCacheMgr { + + private PaimonMetadataCache paimonMetadataCache; + + public PaimonMetadataCacheMgr(ExecutorService executor) { + this.paimonMetadataCache = new PaimonMetadataCache(executor); + } + + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCache; + } + + public void removeCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java similarity index 65% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java index aaaefe7f32d..2307e91adb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java @@ -17,23 +17,16 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; -import org.apache.paimon.table.Table; +public class PaimonMvccSnapshot implements MvccSnapshot { + private final PaimonSnapshotCacheValue snapshotCacheValue; -import java.util.List; - -public class PaimonSchemaCacheValue extends SchemaCacheValue { - - private Table paimonTable; - - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; + public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; } - public Table getPaimonTable() { - return paimonTable; + public PaimonSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java new file mode 100644 index 00000000000..545448199b3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table +public class PaimonPartition { + // Partition values, for example: [1, dd] + private final String partitionValues; + // The amount of data in the partition + private final long recordCount; + // Partition file size + private final long fileSizeInBytes; + // Number of partition files + private final long fileCount; + // Last update time of partition + private final long lastUpdateTime; + + public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime) { + this.partitionValues = partitionValues; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + } + + public String getPartitionValues() { + return partitionValues; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java similarity index 50% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java index aaaefe7f32d..4d3326f8e48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -17,23 +17,32 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.catalog.PartitionItem; -import org.apache.paimon.table.Table; +import com.google.common.collect.Maps; -import java.util.List; +import java.util.Map; -public class PaimonSchemaCacheValue extends SchemaCacheValue { +public class PaimonPartitionInfo { + private final Map<String, PartitionItem> nameToPartitionItem; + private final Map<String, PaimonPartition> nameToPartition; - private Table paimonTable; + public PaimonPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToPartition = Maps.newHashMap(); + } + + public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem, + Map<String, PaimonPartition> nameToPartition) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToPartition = nameToPartition; + } - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; + public Map<String, PartitionItem> getNameToPartitionItem() { + return nameToPartitionItem; } - public Table getPaimonTable() { - return paimonTable; + public Map<String, PaimonPartition> getNameToPartition() { + return nameToPartition; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java similarity index 59% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java index aaaefe7f32d..ef26e1ed208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java @@ -17,23 +17,30 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; - -import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; import java.util.List; -public class PaimonSchemaCacheValue extends SchemaCacheValue { +public class PaimonSchema { + private final long schemaId; + private final List<DataField> fields; + private final List<String> partitionKeys; + + public PaimonSchema(long schemaId, List<DataField> fields, List<String> partitionKeys) { + this.schemaId = schemaId; + this.fields = fields; + this.partitionKeys = partitionKeys; + } - private Table paimonTable; + public long getSchemaId() { + return schemaId; + } - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; + public List<DataField> getFields() { + return fields; } - public Table getPaimonTable() { - return paimonTable; + public List<String> getPartitionKeys() { + return partitionKeys; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java new file mode 100644 index 00000000000..f74555b369b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class PaimonSchemaCacheKey extends SchemaCacheKey { + private final long schemaId; + + public PaimonSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); + this.schemaId = schemaId; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PaimonSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; + return schemaId == that.schemaId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), schemaId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index aaaefe7f32d..ccb530a3cbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -20,20 +20,18 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.paimon.table.Table; - import java.util.List; public class PaimonSchemaCacheValue extends SchemaCacheValue { - private Table paimonTable; + private List<Column> partitionColumns; - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { + public PaimonSchemaCacheValue(List<Column> schema, List<Column> partitionColumns) { super(schema); - this.paimonTable = paimonTable; + this.partitionColumns = partitionColumns; } - public Table getPaimonTable() { - return paimonTable; + public List<Column> getPartitionColumns() { + return partitionColumns; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java similarity index 65% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java index aaaefe7f32d..4a536dd72cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -17,23 +17,20 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; +public class PaimonSnapshot { + private final long snapshotId; + private final long schemaId; -import org.apache.paimon.table.Table; - -import java.util.List; - -public class PaimonSchemaCacheValue extends SchemaCacheValue { - - private Table paimonTable; + public PaimonSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; + public long getSnapshotId() { + return snapshotId; } - public Table getPaimonTable() { - return paimonTable; + public long getSchemaId() { + return schemaId; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java new file mode 100644 index 00000000000..970f111a721 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSnapshotCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + + public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java similarity index 64% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java index aaaefe7f32d..c50ecdabfde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java @@ -17,23 +17,21 @@ package org.apache.doris.datasource.paimon; -import org.apache.doris.catalog.Column; -import org.apache.doris.datasource.SchemaCacheValue; +public class PaimonSnapshotCacheValue { -import org.apache.paimon.table.Table; + private final PaimonPartitionInfo partitionInfo; + private final PaimonSnapshot snapshot; -import java.util.List; - -public class PaimonSchemaCacheValue extends SchemaCacheValue { - - private Table paimonTable; + public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, PaimonSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } - public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) { - super(schema); - this.paimonTable = paimonTable; + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; } - public Table getPaimonTable() { - return paimonTable; + public PaimonSnapshot getSnapshot() { + return snapshot; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java new file mode 100644 index 00000000000..1f7576dca51 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HiveUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class PaimonUtil { + private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + + public static List<InternalRow> read( + Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Pair<ConfigOption<?>, String>... dynamicOptions) + throws IOException { + Map<String, String> options = new HashMap<>(); + for (Pair<ConfigOption<?>, String> pair : dynamicOptions) { + options.put(pair.getKey().key(), pair.getValue()); + } + table = table.copy(options); + ReadBuilder readBuilder = table.newReadBuilder(); + if (projection != null) { + readBuilder.withProjection(projection); + } + if (predicate != null) { + readBuilder.withFilter(predicate); + } + RecordReader<InternalRow> reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List<InternalRow> rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } + + + /* + https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table + +---------------+----------------+--------------------+--------------------+------------------------+ + | partition | record_count | file_size_in_bytes| file_count| last_update_time| + +---------------+----------------+--------------------+--------------------+------------------------+ + | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| + +---------------+----------------+--------------------+--------------------+------------------------+ + org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "record_count", new BigIntType(false)), + new DataField(2, "file_size_in_bytes", new BigIntType(false)), + new DataField(3, "file_count", new BigIntType(false)), + new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); + */ + public static PaimonPartition rowToPartition(InternalRow row) { + String partition = row.getString(0).toString(); + long recordCount = row.getLong(1); + long fileSizeInBytes = row.getLong(2); + long fileCount = row.getLong(3); + long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); + return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); + } + + public static PaimonPartitionInfo generatePartitionInfo(List<Column> partitionColumns, + List<PaimonPartition> paimonPartitions) throws AnalysisException { + Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap(); + Map<String, PaimonPartition> nameToPartition = Maps.newHashMap(); + PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); + if (CollectionUtils.isEmpty(partitionColumns)) { + return partitionInfo; + } + for (PaimonPartition paimonPartition : paimonPartitions) { + String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); + nameToPartition.put(partitionName, paimonPartition); + nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); + } + return partitionInfo; + } + + private static String getPartitionName(List<Column> partitionColumns, String partitionValueStr) { + Preconditions.checkNotNull(partitionValueStr); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + } + return sb.toString(); + } + + public static ListPartitionItem toListPartitionItem(String partitionName, List<Column> partitionColumns) + throws AnalysisException { + List<Type> types = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + List<String> partitionValues = HiveUtil.toPartitionValues(partitionName); + Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types); + List<PartitionValue> values = Lists.newArrayListWithExpectedSize(types.size()); + for (String partitionValue : partitionValues) { + // null will in partition 'null' + // "null" will in partition 'null' + // NULL will in partition 'null' + // "NULL" will in partition 'NULL' + // values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); + values.add(new PartitionValue(partitionValue, false)); + } + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key)); + return listPartitionItem; + } + + private static Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { + int tsScale = 3; // default + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case SMALLINT: + return Type.SMALLINT; + case TINYINT: + return Type.TINYINT; + case VARCHAR: + case BINARY: + case CHAR: + case VARBINARY: + return Type.STRING; + case DECIMAL: + DecimalType decimal = (DecimalType) dataType; + return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.TimestampType) { + tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); + return org.apache.doris.catalog.ArrayType.create(innerType, true); + case MAP: + MapType mapType = (MapType) dataType; + return new org.apache.doris.catalog.MapType( + paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); + case ROW: + RowType rowType = (RowType) dataType; + List<DataField> fields = rowType.getFields(); + return new org.apache.doris.catalog.StructType(fields.stream() + .map(field -> new org.apache.doris.catalog.StructField(field.name(), + paimonTypeToDorisType(field.type()))) + .collect(Collectors.toCollection(ArrayList::new))); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); + return Type.UNSUPPORTED; + } + } + + public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + /** + * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table + * demo: + * 0 + * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, + * {"id":1,"name":"item_id","type":"BIGINT"}, + * {"id":2,"name":"behavior","type":"STRING"}, + * {"id":3,"name":"dt","type":"STRING NOT NULL"}, + * {"id":4,"name":"hh","type":"STRING NOT NULL"}] + * ["dt"] + * ["dt","hh","user_id"] + * {"owner":"hadoop","provider":"paimon"} + * 2024-12-03 15:38:14.734 + * + * @param row + * @return + */ + public static PaimonSchema rowToSchema(InternalRow row) { + long schemaId = row.getLong(0); + String fieldsStr = row.getString(1).toString(); + String partitionKeysStr = row.getString(2).toString(); + List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference<List<DataField>>() { + }); + List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference<List<String>>() { + }); + return new PaimonSchema(schemaId, fields, partitionKeys); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 885eba06ed9..a8bb814f1d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; @@ -36,7 +37,7 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable(); + this.originTable = paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 9d77ac6a6e6..42eee425269 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -30,6 +30,9 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -72,6 +75,7 @@ import java.math.RoundingMode; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -143,6 +147,8 @@ public class MTMVTask extends AbstractTask { private StmtExecutor executor; private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots; + private final Map<MvccTableInfo, MvccSnapshot> snapshots = Maps.newHashMap(); + public MTMVTask() { } @@ -231,6 +237,9 @@ public class MTMVTask extends AbstractTask { throws Exception { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); + for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) { + statementContext.setSnapshot(entry.getKey(), entry.getValue()); + } ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -318,6 +327,11 @@ public class MTMVTask extends AbstractTask { MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf; baseTableIf.beforeMTMVRefresh(mtmv); } + if (tableIf instanceof MvccTable) { + MvccTable mvccTable = (MvccTable) tableIf; + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index d76451d9637..4cc4a6c8600 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -622,7 +622,11 @@ public class StatementContext implements Closeable { public void loadSnapshots() { for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + // may be set by MTMV, we can not load again + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + } } } } @@ -630,11 +634,25 @@ public class StatementContext implements Closeable { /** * Obtain snapshot information of mvcc * - * @param mvccTable mvccTable + * @param tableIf tableIf * @return MvccSnapshot */ - public MvccSnapshot getSnapshot(MvccTable mvccTable) { - return snapshots.get(new MvccTableInfo(mvccTable)); + public Optional<MvccSnapshot> getSnapshot(TableIf tableIf) { + if (!(tableIf instanceof MvccTable)) { + return Optional.empty(); + } + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + return Optional.ofNullable(snapshots.get(mvccTableInfo)); + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTableInfo mvccTableInfo + * @param snapshot snapshot + */ + public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) { + snapshots.put(mvccTableInfo, snapshot); } private static class CloseableResource implements Closeable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index ba8b270d1f3..e99906f5e13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,6 @@ import org.apache.commons.collections.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,8 +73,8 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory { private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) { Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap(); - // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -83,8 +82,8 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory { Map<String, Slot> scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - // todo: real snapshotId - List<Slot> partitionSlots = externalTable.getPartitionColumns(Optional.empty()) + List<Slot> partitionSlots = externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 36cc0f95a77..b0a95ffdd3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -28,6 +28,8 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -316,6 +318,11 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { partitionHasDataItems.add( ((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId())); } + if (targetTable instanceof ExternalTable) { + // Add filter only when partition has data when external table + partitionHasDataItems.add(((ExternalTable) targetTable).getNameToPartitionItems( + MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName)); + } } if (partitionHasDataItems.isEmpty()) { predicates.setNeedAddFilter(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 96b8e032d11..1f5f71f7baf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.TableSample; @@ -60,10 +61,10 @@ public class LogicalFileScan extends LogicalCatalogRelation { } public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, - Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { - // todo: real snapshotId + Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)), + tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java new file mode 100644 index 00000000000..789af7bf835 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonPartition; +import org.apache.doris.datasource.paimon.PaimonPartitionInfo; +import org.apache.doris.datasource.paimon.PaimonUtil; + +import com.google.common.collect.Lists; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PaimonUtilTest { + + @Test + public void testGeneratePartitionInfo() throws AnalysisException { + Column k1 = new Column("k1", PrimitiveType.INT); + Column k2 = new Column("k2", PrimitiveType.VARCHAR); + List<Column> partitionColumns = Lists.newArrayList(k1, k2); + PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); + List<PaimonPartition> paimonPartitions = Lists.newArrayList(p1); + PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + String expectPartitionName = "k1=1/k2=aa"; + Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); + PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); + List<PartitionKey> keys = partitionItem.getItems(); + Assert.assertEquals(1, keys.size()); + PartitionKey partitionKey = keys.get(0); + List<LiteralExpr> exprs = partitionKey.getKeys(); + Assert.assertEquals(2, exprs.size()); + Assert.assertEquals(1, exprs.get(0).getLongValue()); + Assert.assertEquals("aa", exprs.get(1).getStringValue()); + } + + @Test + public void testRowToPartition() { + GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); + PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); + Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); + Assert.assertEquals(2L, paimonPartition.getRecordCount()); + Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); + Assert.assertEquals(4L, paimonPartition.getFileCount()); + Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); + } +} diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out deleted file mode 100644 index c654cb01214..00000000000 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ /dev/null @@ -1,9 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !catalog -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 - --- !mtmv -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 - diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy deleted file mode 100644 index e84eb497b2c..00000000000 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("test_paimon_mtmv", "p0,external,paimon,external_docker,external_docker_hive") { - String enabled = context.config.otherConfigs.get("enablePaimonTest") - logger.info("enabled: " + enabled) - String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - logger.info("externalEnvIp: " + externalEnvIp) - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") - logger.info("hdfs_port: " + hdfs_port) - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "paimon_mtmv_catalog"; - String mvName = "test_paimon_mtmv" - String dbName = "regression_test_mtmv_p0" - String paimonDb = "db1" - String paimonTable = "all_table" - sql """drop catalog if exists ${catalog_name} """ - - sql """create catalog if not exists ${catalog_name} properties ( - "type" = "paimon", - "paimon.catalog.type"="filesystem", - "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" - );""" - - order_qt_catalog """select * from ${catalog_name}.${paimonDb}.${paimonTable}""" - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable}; - """ - - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv "SELECT * FROM ${mvName}" - - sql """drop materialized view if exists ${mvName};""" - sql """ drop catalog if exists ${catalog_name} """ - } -} - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org