This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9eedda343b0993a7059abc36a0b8f207678fc04c Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Thu Jul 13 22:30:07 2023 +0800 [feature](hudi) support hudi time travel in external table (#21739) Support hudi time travel in external table: ``` select * from hudi_table for time as of '20230712221248'; ``` PR(https://github.com/apache/doris/pull/15418) supports to take timestamp or version as the snapshot ID in iceberg, but hudi only has timestamp as the snapshot ID. Therefore, when querying hudi table with `for version as of`, error will be thrown like: ``` ERROR 1105 (HY000): errCode = 2, detailMessage = Hudi table only supports timestamp as snapshot ID ``` The supported formats of timestamp in hudi are: 'yyyy-MM-dd HH:mm:ss[.SSS]' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]', which is consistent with the [time-travel-query.](https://hudi.apache.org/docs/quick-start-guide#time-travel-query) ## Partitioning Strategies Before this PR, hudi's partitions need to be synchronized to hive through [hive-sync-tool](https://hudi.apache.org/docs/syncing_metastore/#hive-sync-tool), or by setting very complex synchronization parameters in [spark conf](https://hudi.apache.org/docs/syncing_metastore/#sync-template). These processes are exceptionally complex and unnecessary, unless you want to query hudi data through hive. In addition, partitions are changed in time travel. We cannot guarantee the correctness of time travel through partition synchronization. So this PR directly obtain partitions by reading hudi meta information. Caching and updating table partition information through hudi instant timestamp, and reusing Doris' partition pruning. --- docs/en/docs/lakehouse/multi-catalog/hudi.md | 22 +- docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md | 22 +- fe/be-java-extensions/hudi-scanner/pom.xml | 3 +- .../org/apache/doris/hudi/BaseSplitReader.scala | 8 +- .../java/org/apache/doris/analysis/TableRef.java | 32 ++- .../org/apache/doris/analysis/TableSnapshot.java | 6 +- .../doris/catalog/HiveMetaStoreClientHelper.java | 13 +- .../java/org/apache/doris/catalog/HudiUtils.java | 34 +++ .../java/org/apache/doris/common/ErrorCode.java | 2 +- .../org/apache/doris/datasource/CatalogMgr.java | 2 +- .../doris/datasource/ExternalMetaCacheMgr.java | 21 +- .../doris/planner/external/FileQueryScanNode.java | 7 +- .../planner/external/TablePartitionValues.java | 255 +++++++++++++++++++++ .../hudi/HudiCachedPartitionProcessor.java | 131 +++++++++++ .../planner/external/hudi/HudiPartitionMgr.java | 86 +++++++ .../external/hudi/HudiPartitionProcessor.java | 124 ++++++++++ .../doris/planner/external/hudi/HudiScanNode.java | 84 ++++++- 17 files changed, 809 insertions(+), 43 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index 890e7fd92c..4c46ccb0e1 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -29,12 +29,12 @@ under the License. ## Usage -1. Doris supports Snapshot Query on Copy-on-Write Hudi tables and Read Optimized Query / Snapshot on Merge-on-Read tables. In the future, it will support Incremental Query and Time Travel. +1. The query types supported by the Hudi table are as follows, and the Incremental Query will be supported in the future. | Table Type | Supported Query types | | ---- | ---- | -| Copy On Write | Snapshot Query | -| Merge On Read | Snapshot Queries + Read Optimized Queries | +| Copy On Write | Snapshot Query + Time Travel | +| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel | 2. Doris supports Hive Metastore(Including catalogs compatible with Hive MetaStore, like [AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md)) Catalogs. @@ -82,3 +82,19 @@ Users can view the perfomace of Java SDK through [profile](../../admin-manual/ht 2. `JavaScanTime`: Time to read data by Java SDK 3. `FillBlockTime`: Time co convert Java column data into C++ column data 4. `GetRecordReaderTime`: Time to create and initialize Hudi Record Reader + +## Time Travel + +Supports reading snapshots specified in Hudi table. + +Every write operation to the Hudi table will generate a new snapshot. + +By default, query requests will only read the latest version of the snapshot. + +You can use the `FOR TIME AS OF` statement, based on the time of the snapshot to read historical version data. Examples are as follows: + +`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";` + +`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";` + +Hudi table does not support the `FOR VERSION AS OF` statement. Using this syntax to query the Hudi table will throw an error. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index d93c4268d8..228be87420 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -29,12 +29,12 @@ under the License. ## 使用限制 -1. 目前支持 Copy On Write 表的 Snapshot Query,以及 Merge On Read 表的 Snapshot Queries 和 Read Optimized Query。后续将支持 Incremental Query 和 Time Travel。 +1. Hudi 表支持的查询类型如下,后续将支持 Incremental Query。 | 表类型 | 支持的查询类型 | | ---- | ---- | -| Copy On Write | Snapshot Query | -| Merge On Read | Snapshot Queries + Read Optimized Queries | +| Copy On Write | Snapshot Query + Time Travel | +| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel | 2. 目前支持 Hive Metastore 和兼容 Hive Metastore 类型(例如[AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md))的 Catalog。 @@ -83,3 +83,19 @@ Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java S 2. `JavaScanTime`: Java SDK 读取数据的时间 3. `FillBlockTime`: Java 数据拷贝为 C++ 数据的时间 4. `GetRecordReaderTime`: 调用 Java SDK 并创建 Hudi Record Reader 的时间 + +## Time Travel + +支持读取 Hudi 表指定的 Snapshot。 + +每一次对 Hudi 表的写操作都会产生一个新的快照。 + +默认情况下,查询请求只会读取最新版本的快照。 + +可以使用 `FOR TIME AS OF` 语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)和Hudi官网保持一致)读取历史版本的数据。示例如下: + +`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";` + +`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";` + +Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。 diff --git a/fe/be-java-extensions/hudi-scanner/pom.xml b/fe/be-java-extensions/hudi-scanner/pom.xml index 1b19da9887..b27c269c23 100644 --- a/fe/be-java-extensions/hudi-scanner/pom.xml +++ b/fe/be-java-extensions/hudi-scanner/pom.xml @@ -36,7 +36,6 @@ under the License. <sparkbundle.version>3.2</sparkbundle.version> <hudi.version>0.13.0</hudi.version> <janino.version>3.0.16</janino.version> - <fasterxml.jackson.version>2.14.3</fasterxml.jackson.version> </properties> <dependencies> @@ -163,7 +162,7 @@ under the License. <!-- version of spark's jackson module is error --> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> - <version>${fasterxml.jackson.version}</version> + <version>${jackson.version}</version> <exclusions> <exclusion> <groupId>com.google.guava</groupId> diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index e2ac89fab8..a4f67feddf 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -211,9 +211,7 @@ abstract class BaseSplitReader(val split: HoodieSplit) { protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) - protected lazy val specifiedQueryTimestamp: Option[String] = - optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) + protected lazy val specifiedQueryTimestamp: Option[String] = Some(split.instantTime) private def queryTimestamp: Option[String] = specifiedQueryTimestamp.orElse(toScalaOption(timeline.lastInstant()).map(_.getTimestamp)) @@ -624,9 +622,7 @@ object BaseSplitReader { // NOTE: We're including compaction here since it's not considering a "commit" operation val timeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants - val specifiedQueryTimestamp: Option[String] = - split.optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) + val specifiedQueryTimestamp: Option[String] = Some(split.instantTime) val schemaResolver = new TableSchemaResolver(metaClient) val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(split.optParams, sparkSession)) { None diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index fcfbd39b44..730826fa2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -21,6 +21,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.HMSExternalTable; @@ -537,15 +538,28 @@ public class TableRef implements ParseNode, Writable { ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE); } HMSExternalTable extTable = (HMSExternalTable) this.getTable(); - if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE); - } - if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) { - String asOfTime = tableSnapshot.getTime(); - Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime); - if (!matcher.matches()) { - throw new AnalysisException("Invalid datetime string: " + asOfTime); - } + switch (extTable.getDlaType()) { + case ICEBERG: + if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) { + String asOfTime = tableSnapshot.getTime(); + Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime); + if (!matcher.matches()) { + throw new AnalysisException("Invalid datetime string: " + asOfTime); + } + } + break; + case HUDI: + if (tableSnapshot.getType() == TableSnapshot.VersionType.VERSION) { + throw new AnalysisException("Hudi table only supports timestamp as snapshot ID"); + } + try { + tableSnapshot.setTime(HudiUtils.formatQueryInstant(tableSnapshot.getTime())); + } catch (Exception e) { + throw new AnalysisException("Failed to parse hudi timestamp: " + e.getMessage(), e); + } + break; + default: + ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java index e5d43b6d0b..0a851a9fd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java @@ -19,7 +19,7 @@ package org.apache.doris.analysis; /** * Snapshot read for time travel - * the version in 2022.12.28 just supports external iceberg table + * supports external iceberg/hudi table */ public class TableSnapshot { @@ -55,6 +55,10 @@ public class TableSnapshot { return time; } + public void setTime(String time) { + this.time = time; + } + public long getVersion() { return version; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 232ce4f285..fbb3c71b29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -922,10 +922,7 @@ public class HiveMetaStoreClientHelper { return hudiSchema; } - public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { - String hudiBasePath = table.getRemoteTable().getSd().getLocation(); - - Configuration conf = getConfiguration(table); + public static UserGroupInformation getUserGroupInformation(Configuration conf) { UserGroupInformation ugi = null; String authentication = conf.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, null); if (AuthType.KERBEROS.getDesc().equals(authentication)) { @@ -945,6 +942,14 @@ public class HiveMetaStoreClientHelper { ugi = UserGroupInformation.createRemoteUser(hadoopUserName); } } + return ugi; + } + + public static HoodieTableMetaClient getHudiClient(HMSExternalTable table) { + String hudiBasePath = table.getRemoteTable().getSd().getLocation(); + + Configuration conf = getConfiguration(table); + UserGroupInformation ugi = getUserGroupInformation(conf); HoodieTableMetaClient metaClient; if (ugi != null) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java index 0799f7b137..e52b52ab8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HudiUtils.java @@ -22,12 +22,18 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public class HudiUtils { + private static final SimpleDateFormat defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd"); + public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) { Schema.Type columnType = avroSchema.getType(); LogicalType logicalType = avroSchema.getLogicalType(); @@ -166,4 +172,32 @@ public class HudiUtils { } return Type.UNSUPPORTED; } + + /** + * Convert different query instant time format to the commit time format. + * Currently we support three kinds of instant time format for time travel query: + * 1、yyyy-MM-dd HH:mm:ss + * 2、yyyy-MM-dd + * This will convert to 'yyyyMMdd000000'. + * 3、yyyyMMddHHmmss + */ + public static String formatQueryInstant(String queryInstant) throws ParseException { + int instantLength = queryInstant.length(); + if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] + if (instantLength == 19) { + queryInstant += ".000"; + } + return HoodieInstantTimeGenerator.getInstantForDateString(queryInstant); + } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH + || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] + HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // validate the format + return queryInstant; + } else if (instantLength == 10) { // for yyyy-MM-dd + return HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant)); + } else { + throw new IllegalArgumentException("Unsupported query instant time format: " + queryInstant + + ", Supported time format are: 'yyyy-MM-dd HH:mm:ss[.SSS]' " + + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index f773bfd5ff..c827a41314 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1186,7 +1186,7 @@ public enum ErrorCode { ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'}, "Table name length exceeds limit, " + "the length of table name '%s' is %d which is greater than the configuration 'table_name_length_limit' (%d)."), - ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', '0'}, "Only iceberg external" + ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', '0'}, "Only iceberg/hudi external" + " table supports time travel in current version"), ERR_NONSSL_HANDSHAKE_RESPONSE(5091, new byte[] {'4', '2', '0', '0'}, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index 96a7c1eae3..3ed74e260b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -128,7 +128,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable { catalog.onClose(); nameToCatalog.remove(catalog.getName()); lastDBOfCatalog.remove(catalog.getName()); - Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getName()); + Env.getCurrentEnv().getExtMetaCacheMgr().removeCache(catalog.getId()); if (!Strings.isNullOrEmpty(catalog.getResource())) { Resource catalogResource = Env.getCurrentEnv().getResourceMgr().getResource(catalog.getResource()); if (catalogResource != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index f5ca819c3b..16ffcc71f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -23,6 +23,8 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.planner.external.hudi.HudiPartitionMgr; +import org.apache.doris.planner.external.hudi.HudiPartitionProcessor; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -41,9 +43,11 @@ public class ExternalMetaCacheMgr { private static final Logger LOG = LogManager.getLogger(ExternalMetaCacheMgr.class); // catalog id -> HiveMetaStoreCache - private Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap(); + private final Map<Long, HiveMetaStoreCache> cacheMap = Maps.newConcurrentMap(); // catalog id -> table schema cache private Map<Long, ExternalSchemaCache> schemaCacheMap = Maps.newHashMap(); + // hudi partition manager + private final HudiPartitionMgr hudiPartitionMgr; private ExecutorService executor; public ExternalMetaCacheMgr() { @@ -51,6 +55,7 @@ public class ExternalMetaCacheMgr { Config.max_external_cache_loader_thread_pool_size, Config.max_external_cache_loader_thread_pool_size * 1000, "ExternalMetaCacheMgr", 120, true); + hudiPartitionMgr = HudiPartitionMgr.get(executor); } public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) { @@ -79,13 +84,18 @@ public class ExternalMetaCacheMgr { return cache; } - public void removeCache(String catalogId) { + public HudiPartitionProcessor getHudiPartitionProcess(ExternalCatalog catalog) { + return hudiPartitionMgr.getPartitionProcessor(catalog); + } + + public void removeCache(long catalogId) { if (cacheMap.remove(catalogId) != null) { - LOG.info("remove hive metastore cache for catalog {}" + catalogId); + LOG.info("remove hive metastore cache for catalog {}", catalogId); } if (schemaCacheMap.remove(catalogId) != null) { - LOG.info("remove schema cache for catalog {}" + catalogId); + LOG.info("remove schema cache for catalog {}", catalogId); } + hudiPartitionMgr.removePartitionProcessor(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -98,6 +108,7 @@ public class ExternalMetaCacheMgr { if (metaCache != null) { metaCache.invalidateTableCache(dbName, tblName); } + hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -111,6 +122,7 @@ public class ExternalMetaCacheMgr { if (metaCache != null) { metaCache.invalidateDbCache(dbName); } + hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -123,6 +135,7 @@ public class ExternalMetaCacheMgr { if (metaCache != null) { metaCache.invalidateAll(); } + hudiPartitionMgr.cleanPartitionProcess(catalogId); LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 7a7dd76ab7..09ed6361e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -252,9 +252,10 @@ public abstract class FileQueryScanNode extends FileScanNode { // set hdfs params for hdfs file type. Map<String, String> locationProperties = getLocationProperties(); - if (fileFormatType == TFileFormatType.FORMAT_JNI) { + if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) { scanRangeParams.setProperties(locationProperties); - } else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + } + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { String fsName = getFsName(fileSplit); THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); tHdfsParams.setFsName(fsName); @@ -267,8 +268,6 @@ public abstract class FileQueryScanNode extends FileScanNode { } scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); } - } else if (locationType == TFileType.FILE_S3) { - scanRangeParams.setProperties(locationProperties); } TScanRangeLocations curLocations = newLocations(scanRangeParams); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java new file mode 100644 index 0000000000..bcc967501c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.planner.ColumnBound; +import org.apache.doris.planner.ListPartitionPrunerV2; +import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import lombok.Data; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +@Data +public class TablePartitionValues { + public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__"; + + private final ReadWriteLock readWriteLock; + private long lastUpdateTimestamp; + private long nextPartitionId; + private final Map<Long, PartitionItem> idToPartitionItem; + private final Map<String, Long> partitionNameToIdMap; + private final Map<Long, String> partitionIdToNameMap; + + private Map<Long, List<UniqueId>> idToUniqueIdsMap; + private Map<Long, List<String>> partitionValuesMap; + //multi pair + private Map<UniqueId, Range<PartitionKey>> uidToPartitionRange; + private Map<Range<PartitionKey>, UniqueId> rangeToId; + //single pair + private RangeMap<ColumnBound, UniqueId> singleColumnRangeMap; + private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap; + + public TablePartitionValues() { + readWriteLock = new ReentrantReadWriteLock(); + lastUpdateTimestamp = 0; + nextPartitionId = 0; + idToPartitionItem = new HashMap<>(); + partitionNameToIdMap = new HashMap<>(); + partitionIdToNameMap = new HashMap<>(); + } + + public TablePartitionValues(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) { + this(); + addPartitions(partitionNames, partitionValues, types); + } + + public TablePartitionValues(List<String> partitionNames, List<Type> types) { + this(); + addPartitions(partitionNames, types); + } + + public void addPartitions(List<String> partitionNames, List<List<String>> partitionValues, List<Type> types) { + Preconditions.checkState(partitionNames.size() == partitionValues.size()); + List<String> addPartitionNames = new ArrayList<>(); + List<PartitionItem> addPartitionItems = new ArrayList<>(); + partitionNameToIdMap.forEach((partitionName, partitionId) -> { + addPartitionNames.add(partitionName); + addPartitionItems.add(idToPartitionItem.get(partitionId)); + }); + + for (int i = 0; i < partitionNames.size(); i++) { + if (!partitionNameToIdMap.containsKey(partitionNames.get(i))) { + addPartitionNames.add(partitionNames.get(i)); + addPartitionItems.add(toListPartitionItem(partitionValues.get(i), types)); + } + } + cleanPartitions(); + + addPartitionItems(addPartitionNames, addPartitionItems, types); + } + + public void addPartitions(List<String> partitionNames, List<Type> types) { + addPartitions(partitionNames, + partitionNames.stream().map(this::getHivePartitionValues).collect(Collectors.toList()), types); + } + + private void addPartitionItems(List<String> partitionNames, List<PartitionItem> partitionItems, List<Type> types) { + Preconditions.checkState(partitionNames.size() == partitionItems.size()); + Preconditions.checkState(nextPartitionId == 0); + for (int i = 0; i < partitionNames.size(); i++) { + long partitionId = nextPartitionId++; + idToPartitionItem.put(partitionId, partitionItems.get(i)); + partitionNameToIdMap.put(partitionNames.get(i), partitionId); + partitionIdToNameMap.put(partitionId, partitionNames.get(i)); + } + + // create a new map for partitionId <---> uniqueId + idToUniqueIdsMap = new HashMap<>(); + + if (types.size() > 1) { + // uidToPartitionRange and rangeToId are only used for multi-column partition + uidToPartitionRange = ListPartitionPrunerV2.genUidToPartitionRange(idToPartitionItem, idToUniqueIdsMap); + rangeToId = ListPartitionPrunerV2.genRangeToId(uidToPartitionRange); + } else { + Preconditions.checkState(types.size() == 1); + // singleColumnRangeMap is only used for single-column partition + singleColumnRangeMap = ListPartitionPrunerV2.genSingleColumnRangeMap(idToPartitionItem, idToUniqueIdsMap); + singleUidToColumnRangeMap = ListPartitionPrunerV2.genSingleUidToColumnRange(singleColumnRangeMap); + } + partitionValuesMap = ListPartitionPrunerV2.getPartitionValuesMap(idToPartitionItem); + } + + public void dropPartitions(List<String> partitionNames, List<Type> types) { + partitionNames.forEach(p -> { + Long removedPartition = partitionNameToIdMap.get(p); + if (removedPartition != null) { + idToPartitionItem.remove(removedPartition); + } + }); + List<String> remainingPartitionNames = new ArrayList<>(); + List<PartitionItem> remainingPartitionItems = new ArrayList<>(); + partitionNameToIdMap.forEach((partitionName, partitionId) -> { + remainingPartitionNames.add(partitionName); + remainingPartitionItems.add(idToPartitionItem.get(partitionId)); + }); + cleanPartitions(); + addPartitionItems(remainingPartitionNames, remainingPartitionItems, types); + } + + public long getLastUpdateTimestamp() { + return lastUpdateTimestamp; + } + + public void setLastUpdateTimestamp(long lastUpdateTimestamp) { + this.lastUpdateTimestamp = lastUpdateTimestamp; + } + + public Lock readLock() { + return readWriteLock.readLock(); + } + + public Lock writeLock() { + return readWriteLock.writeLock(); + } + + private void cleanPartitions() { + nextPartitionId = 0; + idToPartitionItem.clear(); + partitionNameToIdMap.clear(); + partitionIdToNameMap.clear(); + + idToUniqueIdsMap = null; + partitionValuesMap = null; + uidToPartitionRange = null; + rangeToId = null; + singleColumnRangeMap = null; + singleUidToColumnRangeMap = null; + } + + private ListPartitionItem toListPartitionItem(List<String> partitionValues, List<Type> types) { + Preconditions.checkState(partitionValues.size() == types.size()); + try { + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes( + partitionValues.stream().map(p -> new PartitionValue(p, HIVE_DEFAULT_PARTITION.equals(p))) + .collect(Collectors.toList()), + types); + return new ListPartitionItem(Lists.newArrayList(key)); + } catch (AnalysisException e) { + throw new CacheException("failed to convert partition %s to list partition", + e, partitionValues); + } + } + + private List<String> getHivePartitionValues(String partitionName) { + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + return Arrays.stream(partitionName.split("/")).map(part -> { + String[] kv = part.split("="); + Preconditions.checkState(kv.length == 2, partitionName); + String partitionValue; + try { + // hive partition value maybe contains special characters like '=' and '/' + partitionValue = URLDecoder.decode(kv[1], StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + // It should not be here + throw new RuntimeException(e); + } + return partitionValue; + }).collect(Collectors.toList()); + } + + @Data + public static class TablePartitionKey { + private String dbName; + private String tblName; + // not in key + private List<Type> types; + + public TablePartitionKey(String dbName, String tblName, List<Type> types) { + this.dbName = dbName; + this.tblName = tblName; + this.types = types; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof TablePartitionKey)) { + return false; + } + return dbName.equals(((TablePartitionKey) obj).dbName) + && tblName.equals(((TablePartitionKey) obj).tblName); + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tblName); + } + + @Override + public String toString() { + return "TablePartitionKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}'; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java new file mode 100644 index 0000000000..ab6a8839b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.hudi; + +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.planner.external.TablePartitionValues; +import org.apache.doris.planner.external.TablePartitionValues.TablePartitionKey; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { + private final long catalogId; + private final LoadingCache<TablePartitionKey, TablePartitionValues> partitionCache; + + public HudiCachedPartitionProcessor(long catalogId, Executor executor) { + this.catalogId = catalogId; + this.partitionCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_table_cache_num) + .expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES) + .build(CacheLoader.asyncReloading( + new CacheLoader<TablePartitionKey, TablePartitionValues>() { + @Override + public TablePartitionValues load(TablePartitionKey key) throws Exception { + return new TablePartitionValues(); + } + }, executor)); + } + + @Override + public void cleanUp() { + partitionCache.cleanUp(); + } + + @Override + public void cleanDatabasePartitions(String dbName) { + partitionCache.asMap().keySet().stream().filter(k -> k.getDbName().equals(dbName)).collect(Collectors.toList()) + .forEach(partitionCache::invalidate); + + } + + @Override + public void cleanTablePartitions(String dbName, String tblName) { + partitionCache.asMap().keySet().stream() + .filter(k -> k.getDbName().equals(dbName) && k.getTblName().equals(tblName)) + .collect(Collectors.toList()) + .forEach(partitionCache::invalidate); + } + + public TablePartitionValues getPartitionValues(HMSExternalTable table, HoodieTableMetaClient tableMetaClient) + throws CacheException { + assert (catalogId == table.getCatalog().getId()); + Option<String[]> partitionColumns = tableMetaClient.getTableConfig().getPartitionFields(); + if (!partitionColumns.isPresent()) { + return null; + } + HoodieTimeline timeline = tableMetaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + Option<HoodieInstant> lastInstant = timeline.lastInstant(); + if (!lastInstant.isPresent()) { + return null; + } + try { + long lastTimestamp = Long.parseLong(lastInstant.get().getTimestamp()); + TablePartitionValues partitionValues = partitionCache.get( + new TablePartitionKey(table.getDbName(), table.getName(), table.getPartitionColumnTypes())); + partitionValues.readLock().lock(); + try { + long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); + if (lastTimestamp == lastUpdateTimestamp) { + return partitionValues; + } + assert (lastTimestamp > lastUpdateTimestamp); + } finally { + partitionValues.readLock().unlock(); + } + + partitionValues.writeLock().lock(); + try { + long lastUpdateTimestamp = partitionValues.getLastUpdateTimestamp(); + if (lastTimestamp == lastUpdateTimestamp) { + return partitionValues; + } + assert (lastTimestamp > lastUpdateTimestamp); + List<String> partitionNames; + if (lastUpdateTimestamp == 0) { + partitionNames = getAllPartitionNames(tableMetaClient); + } else { + partitionNames = getPartitionNamesInRange(timeline, String.valueOf(lastUpdateTimestamp), + String.valueOf(lastTimestamp)); + } + List<String> partitionColumnsList = Arrays.asList(partitionColumns.get()); + partitionValues.addPartitions(partitionNames, + partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p)) + .collect(Collectors.toList()), table.getPartitionColumnTypes()); + partitionValues.setLastUpdateTimestamp(lastTimestamp); + return partitionValues; + } finally { + partitionValues.writeLock().unlock(); + } + } catch (Exception e) { + throw new CacheException("Failed to get hudi partitions", e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java new file mode 100644 index 0000000000..4956a6f58c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionMgr.java @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.hudi; + +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.HMSExternalCatalog; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.concurrent.Executor; + +public class HudiPartitionMgr { + private static volatile HudiPartitionMgr partitionMgr = null; + + private static final Map<Long, HudiPartitionProcessor> partitionProcessors = Maps.newConcurrentMap(); + private final Executor executor; + + private HudiPartitionMgr(Executor executor) { + this.executor = executor; + } + + public HudiPartitionProcessor getPartitionProcessor(ExternalCatalog catalog) { + return partitionProcessors.computeIfAbsent(catalog.getId(), catalogId -> { + if (catalog instanceof HMSExternalCatalog) { + return new HudiCachedPartitionProcessor(catalogId, executor); + } else { + throw new RuntimeException("Hudi only supports hive(or compatible) catalog now"); + } + }); + } + + public void removePartitionProcessor(long catalogId) { + HudiPartitionProcessor processor = partitionProcessors.remove(catalogId); + if (processor != null) { + processor.cleanUp(); + } + } + + public void cleanPartitionProcess(long catalogId) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanUp(); + } + } + + public void cleanDatabasePartitions(long catalogId, String dbName) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanDatabasePartitions(dbName); + } + } + + public void cleanTablePartitions(long catalogId, String dbName, String tblName) { + HudiPartitionProcessor processor = partitionProcessors.get(catalogId); + if (processor != null) { + processor.cleanTablePartitions(dbName, tblName); + } + } + + public static HudiPartitionMgr get(Executor executor) { + if (partitionMgr == null) { + synchronized (HudiPartitionMgr.class) { + if (partitionMgr == null) { + partitionMgr = new HudiPartitionMgr(executor); + } + } + } + return partitionMgr; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java new file mode 100644 index 0000000000..3be3e1f080 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiPartitionProcessor.java @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner.external.hudi; + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public abstract class HudiPartitionProcessor { + + public abstract void cleanUp(); + + public abstract void cleanDatabasePartitions(String dbName); + + public abstract void cleanTablePartitions(String dbName, String tblName); + + public String[] getPartitionColumns(HoodieTableMetaClient tableMetaClient) { + return tableMetaClient.getTableConfig().getPartitionFields().get(); + } + + public List<String> getAllPartitionNames(HoodieTableMetaClient tableMetaClient) throws IOException { + TypedProperties configProperties = new TypedProperties(); + HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(configProperties) + .enable(configProperties.getBoolean(HoodieMetadataConfig.ENABLE.key(), + HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS) + && HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) + .build(); + + HoodieTableMetadata newTableMetadata = HoodieTableMetadata.create( + new HoodieLocalEngineContext(tableMetaClient.getHadoopConf()), metadataConfig, + tableMetaClient.getBasePathV2().toString(), + FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true); + + return newTableMetadata.getPartitionPathWithPathPrefixes(Collections.singletonList("")); + } + + public List<String> getPartitionNamesInRange(HoodieTimeline timeline, String startTimestamp, String endTimestamp) { + return new ArrayList<>(HoodieInputFormatUtils.getWritePartitionPaths( + timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants().stream() + .map(instant -> { + try { + return TimelineUtils.getCommitMetadata(instant, timeline); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + }).collect(Collectors.toList()))); + } + + public static List<String> parsePartitionValues(List<String> partitionColumns, String partitionPath) { + if (partitionColumns.size() == 0) { + // This is a non-partitioned table + return Collections.emptyList(); + } + String[] partitionFragments = partitionPath.split("/"); + if (partitionFragments.length != partitionColumns.size()) { + if (partitionColumns.size() == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + String prefix = partitionColumns.get(0) + "="; + String partitionValue; + if (partitionPath.startsWith(prefix)) { + // support hive style partition path + partitionValue = partitionPath.substring(prefix.length()); + } else { + partitionValue = partitionPath; + } + // TODO: In hive, the specific characters like '=', '/' will be url encoded + return Collections.singletonList(partitionValue); + } else { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns and therefore return an empty tuple. We don't + // fail outright so that in some cases we can fallback to reading the table as non-partitioned + // one + throw new RuntimeException("Failed to parse partition values of path: " + partitionPath); + } + } else { + // If partitionSeqs.length == partitionSchema.fields.length + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + List<String> partitionValues = new ArrayList<>(partitionFragments.length); + for (int i = 0; i < partitionFragments.length; i++) { + String prefix = partitionColumns.get(i) + "="; + if (partitionFragments[i].startsWith(prefix)) { + partitionValues.add(partitionFragments[i].substring(prefix.length())); + } else { + partitionValues.add(partitionFragments[i]); + } + } + return partitionValues; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java index 3c4fb0d1fa..734b3943a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiScanNode.java @@ -18,16 +18,21 @@ package org.apache.doris.planner.external.hudi; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.HudiUtils; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.Type; import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.external.FileSplit; import org.apache.doris.planner.external.HiveScanNode; import org.apache.doris.planner.external.TableFormatType; +import org.apache.doris.planner.external.TablePartitionValues; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -36,10 +41,12 @@ import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.THudiFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; +import com.google.common.collect.Lists; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; @@ -54,7 +61,10 @@ import org.apache.hudi.common.util.Option; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -137,6 +147,53 @@ public class HudiScanNode extends HiveScanNode { rangeDesc.setTableFormatParams(tableFormatFileDesc); } + private List<HivePartition> getPrunedPartitions(HoodieTableMetaClient metaClient) throws AnalysisException { + List<Type> partitionColumnTypes = hmsTable.getPartitionColumnTypes(); + if (!partitionColumnTypes.isEmpty()) { + HudiCachedPartitionProcessor processor = (HudiCachedPartitionProcessor) Env.getCurrentEnv() + .getExtMetaCacheMgr().getHudiPartitionProcess(hmsTable.getCatalog()); + TablePartitionValues partitionValues = processor.getPartitionValues(hmsTable, metaClient); + if (partitionValues != null) { + // 2. prune partitions by expr + partitionValues.readLock().lock(); + try { + Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem(); + this.totalPartitionNum = idToPartitionItem.size(); + ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem, + hmsTable.getPartitionColumns(), columnNameToRange, + partitionValues.getUidToPartitionRange(), + partitionValues.getRangeToId(), + partitionValues.getSingleColumnRangeMap(), + true); + Collection<Long> filteredPartitionIds = pruner.prune(); + this.readPartitionNum = filteredPartitionIds.size(); + // 3. get partitions from cache + String dbName = hmsTable.getDbName(); + String tblName = hmsTable.getName(); + String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); + String basePath = metaClient.getBasePathV2().toString(); + Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap(); + Map<Long, List<String>> partitionValuesMap = partitionValues.getPartitionValuesMap(); + return filteredPartitionIds.stream().map(id -> { + String path = basePath + "/" + partitionIdToNameMap.get(id); + return new HivePartition( + dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id)); + }).collect(Collectors.toList()); + } finally { + partitionValues.readLock().unlock(); + } + } + } + // unpartitioned table, create a dummy partition to save location and inputformat, + // so that we can unify the interface. + HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true, + hmsTable.getRemoteTable().getSd().getInputFormat(), + hmsTable.getRemoteTable().getSd().getLocation(), null); + this.totalPartitionNum = 1; + this.readPartitionNum = 1; + return Lists.newArrayList(dummyPartition); + } + @Override public List<Split> getSplits() throws UserException { HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); @@ -173,13 +230,30 @@ public class HudiScanNode extends HiveScanNode { } HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); - Option<HoodieInstant> latestInstant = timeline.lastInstant(); - if (!latestInstant.isPresent()) { - return new ArrayList<>(); + String queryInstant; + if (desc.getRef().getTableSnapshot() != null) { + queryInstant = desc.getRef().getTableSnapshot().getTime(); + } else { + Option<HoodieInstant> snapshotInstant = timeline.lastInstant(); + if (!snapshotInstant.isPresent()) { + return Collections.emptyList(); + } + queryInstant = snapshotInstant.get().getTimestamp(); } - String queryInstant = latestInstant.get().getTimestamp(); // Non partition table will get one dummy partition - List<HivePartition> partitions = getPartitions(); + UserGroupInformation ugi = HiveMetaStoreClientHelper.getUserGroupInformation( + HiveMetaStoreClientHelper.getConfiguration(hmsTable)); + List<HivePartition> partitions; + if (ugi != null) { + try { + partitions = ugi.doAs( + (PrivilegedExceptionAction<List<HivePartition>>) () -> getPrunedPartitions(hudiClient)); + } catch (Exception e) { + throw new UserException(e); + } + } else { + partitions = getPrunedPartitions(hudiClient); + } try { for (HivePartition partition : partitions) { String globPath; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org