This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5593731566e [fix][refactor] refactor schema init of externa table and some parquet issue (#30325) 5593731566e is described below commit 5593731566e8fb1ad2a554bc5ec1c48fb8f994ba Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Jan 31 18:33:30 2024 +0800 [fix][refactor] refactor schema init of externa table and some parquet issue (#30325) 1. Skip parquet file which has only 4 bytes length: PAR1 2. Refactor the schema init method of iceberg/hudi/hive table in hms catalog 1. Remove some redundant methods of `getIcebergTable` 2. Fix issue described in #23771 3. Support HoodieParquetInputFormatBase, treat it as normal hive table format 4. When listing file, skip all hidden dirs and files --- .gitignore | 2 + be/src/vec/exec/format/parquet/vparquet_reader.cpp | 9 ++- .../doris/catalog/HiveMetaStoreClientHelper.java | 2 +- .../doris/catalog/external/HMSExternalTable.java | 73 +++++++++--------- .../catalog/external/IcebergExternalTable.java | 76 +----------------- .../doris/datasource/hive/HiveMetaStoreCache.java | 18 ++--- .../datasource/iceberg/IcebergExternalCatalog.java | 2 +- .../apache/doris/external/hive/util/HiveUtil.java | 44 ++--------- .../doris/external/iceberg/util/IcebergUtils.java | 90 ++++++++++++++++++++-- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 4 + .../planner/external/iceberg/IcebergApiSource.java | 9 +-- .../planner/external/iceberg/IcebergHMSSource.java | 4 +- .../external/iceberg/IcebergMetadataCache.java | 80 ++++--------------- .../planner/external/iceberg/IcebergScanNode.java | 3 +- .../doris/statistics/util/StatisticsUtil.java | 2 +- 15 files changed, 175 insertions(+), 243 deletions(-) diff --git a/.gitignore b/.gitignore index a8ad35b55d5..2513a796189 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,8 @@ thirdparty/installed* thirdparty/doris-thirdparty*.tar.xz docker/thirdparties/docker-compose/mysql/data +docker/thirdparties/docker-compose/hive/scripts/tpch1.db/ +docker/thirdparties/docker-compose/hive/scripts/paimon1 fe_plugins/output fe_plugins/**/.factorypath diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6ae4ea2f5bc..dd8d01f4bb1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -238,8 +238,11 @@ Status ParquetReader::_open_file() { } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); - if (_file_reader->size() == 0) { - return Status::EndOfFile("open file failed, empty parquet file: " + _scan_range.path); + if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) { + // Some system may generate parquet file with only 4 bytes: PAR1 + // Should consider it as empty file. + return Status::EndOfFile("open file failed, empty parquet file {} with size: {}", + _scan_range.path, _file_reader->size()); } size_t meta_size = 0; if (_meta_cache == nullptr) { @@ -928,4 +931,4 @@ int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& } return column.data_page_offset; } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 96419cd0b0a..7d0419fcbf9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -770,7 +770,7 @@ public class HiveMetaStoreClientHelper { try { hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); } catch (Exception e) { - throw new RuntimeException("Cannot get hudi table schema."); + throw new RuntimeException("Cannot get hudi table schema.", e); } return hudiSchema; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index e1037ffd025..028928eafce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.datasource.hive.HivePartition; +import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.statistics.AnalysisInfo; @@ -62,8 +63,6 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -87,8 +86,10 @@ import java.util.stream.Collectors; public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf { private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class); - private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS; - private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS; + public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS; + public static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS; + public static final Set<String> SUPPORTED_HUDI_FILE_FORMATS; + private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS; private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; @@ -111,13 +112,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.mapred.TextInputFormat"); + // Some hudi table use HoodieParquetInputFormatBase as input format + // But we can't treat it as hudi table. + // So add to SUPPORTED_HIVE_FILE_FORMATS and treat is as a hive table. + // Then Doris will just list the files from location and read parquet files directly. + SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormatBase"); SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); } - private static final Set<String> SUPPORTED_HUDI_FILE_FORMATS; - static { SUPPORTED_HUDI_FILE_FORMATS = Sets.newHashSet(); SUPPORTED_HUDI_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormat"); @@ -405,10 +409,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return ((HMSExternalCatalog) catalog).getHiveMetastoreUris(); } - public String getHiveVersion() { - return ((HMSExternalCatalog) catalog).getHiveVersion(); - } - public Map<String, String> getCatalogProperties() { return catalog.getProperties(); } @@ -454,32 +454,28 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return initSchema(); } - @Override public List<Column> initSchema() { makeSureInitialized(); List<Column> columns; - List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name); if (dlaType.equals(DLAType.ICEBERG)) { - columns = getIcebergSchema(schema); + columns = getIcebergSchema(); } else if (dlaType.equals(DLAType.HUDI)) { - columns = getHudiSchema(schema); + columns = getHudiSchema(); } else { - List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size()); - for (FieldSchema field : schema) { - tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), - HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, - true, field.getComment(), true, -1)); - } - columns = tmpSchema; + columns = getHiveSchema(); } initPartitionColumns(columns); return columns; } - public List<Column> getHudiSchema(List<FieldSchema> hmsSchema) { + private List<Column> getIcebergSchema() { + return IcebergUtils.getSchema(catalog, dbName, name); + } + + private List<Column> getHudiSchema() { org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size()); for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) { String columnName = hudiField.name().toLowerCase(Locale.ROOT); tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()), @@ -488,6 +484,19 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return tmpSchema; } + private List<Column> getHiveSchema() { + List<Column> columns; + List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size()); + for (FieldSchema field : schema) { + tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), + HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, + true, field.getComment(), true, -1)); + } + columns = tmpSchema; + return columns; + } + @Override public long getCacheRowCount() { //Cached accurate information @@ -528,20 +537,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return 1; } - private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) { - Table icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this); - Schema schema = icebergTable.schema(); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); - for (FieldSchema field : hmsSchema) { - tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), - HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(), - IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS), - true, null, true, false, null, field.getComment(), true, null, - schema.caseInsensitiveFindField(field.getName()).fieldId(), null)); - } - return tmpSchema; - } - private void initPartitionColumns(List<Column> schema) { List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName) .collect(Collectors.toList()); @@ -598,7 +593,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return getHiveColumnStats(colName); case ICEBERG: return StatisticsUtil.getIcebergColumnStats(colName, - Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this)); + Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable( + catalog, dbName, name + )); default: LOG.warn("get column stats for dlaType {} is not supported.", dlaType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java index be99e26de62..be320fc9268 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -17,12 +17,10 @@ package org.apache.doris.catalog.external; -import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -33,21 +31,11 @@ import org.apache.doris.thrift.TIcebergTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import com.google.common.collect.Lists; -import org.apache.iceberg.Schema; -import org.apache.iceberg.types.Types; - import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Optional; public class IcebergExternalTable extends ExternalTable { - - // https://iceberg.apache.org/spec/#schemas-and-data-types - // All time and timestamp values are stored with microsecond precision - public static final int ICEBERG_DATETIME_SCALE_MS = 6; - public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE); } @@ -65,66 +53,8 @@ public class IcebergExternalTable extends ExternalTable { @Override public List<Column> initSchema() { - return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { - Schema schema = ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name).schema(); - List<Types.NestedField> columns = schema.columns(); - List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), - icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, - schema.caseInsensitiveFindField(field.name()).fieldId())); - } - return tmpSchema; - }); - } - - private Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) { - switch (primitive.typeId()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case LONG: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case STRING: - case BINARY: - case UUID: - return Type.STRING; - case FIXED: - Types.FixedType fixed = (Types.FixedType) primitive; - return ScalarType.createCharType(fixed.length()); - case DECIMAL: - Types.DecimalType decimal = (Types.DecimalType) primitive; - return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP: - return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS); - case TIME: - return Type.UNSUPPORTED; - default: - throw new IllegalArgumentException("Cannot transform unknown type: " + primitive); - } - } - - protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) { - if (type.isPrimitiveType()) { - return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type); - } - switch (type.typeId()) { - case LIST: - Types.ListType list = (Types.ListType) type; - return ArrayType.create(icebergTypeToDorisType(list.elementType()), true); - case MAP: - case STRUCT: - return Type.UNSUPPORTED; - default: - throw new IllegalArgumentException("Cannot transform unknown type: " + type); - } + makeSureInitialized(); + return IcebergUtils.getSchema(catalog, dbName, name); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 56fffc41ddd..ebe8d692c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -358,14 +358,14 @@ public class HiveMetaStoreCache { } // Get File Status by using FileSystem API. - private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat, - JobConf jobConf, - List<String> partitionValues, - String bindBrokerName) throws UserException { + private FileCacheValue getFileCache(String location, String inputFormat, + JobConf jobConf, + List<String> partitionValues, + String bindBrokerName) throws UserException { FileCacheValue result = new FileCacheValue(); RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem( new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity( - location, bindBrokerName), jobConf, bindBrokerName)); + location, bindBrokerName), jobConf, bindBrokerName)); result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf)); try { // For Tez engine, it may generate subdirectoies for "union" query. @@ -425,12 +425,12 @@ public class HiveMetaStoreCache { FileInputFormat.setInputPaths(jobConf, finalLocation.get()); try { FileCacheValue result; - InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); // TODO: This is a temp config, will remove it after the HiveSplitter is stable. if (key.useSelfSplitter) { - result = getFileCache(finalLocation.get(), inputFormat, jobConf, - key.getPartitionValues(), key.bindBrokerName); + result = getFileCache(finalLocation.get(), key.inputFormat, jobConf, + key.getPartitionValues(), key.bindBrokerName); } else { + InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false); InputSplit[] splits; String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME); if (!Strings.isNullOrEmpty(remoteUser)) { @@ -1082,7 +1082,7 @@ public class HiveMetaStoreCache { private static boolean isGeneratedPath(String name) { return "_temporary".equals(name) // generated by spark || "_imapala_insert_staging".equals(name) // generated by impala - || name.startsWith(".hive-staging"); // generated by hive + || name.startsWith("."); // generated by hive or hidden file } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index c8ff468ab29..fadc60913be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -106,6 +106,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { return Env.getCurrentEnv() .getExtMetaCacheMgr() .getIcebergMetadataCache() - .getIcebergTable(catalog, id, dbName, tblName, getProperties()); + .getIcebergTable(this, dbName, tblName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index deb048b5943..7ed44620a37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -21,16 +21,14 @@ import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; @@ -46,10 +44,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.List; @@ -134,7 +129,6 @@ public final class HiveUtil { } private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) { - switch (hiveTypeInfo.getCategory()) { case PRIMITIVE: { PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo; @@ -190,39 +184,14 @@ public final class HiveUtil { } } - public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat, - String location, JobConf jobConf) throws UserException { + public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat, + String location, JobConf jobConf) throws UserException { if (remoteFileSystem instanceof BrokerFileSystem) { - return ((BrokerFileSystem) remoteFileSystem) - .isSplittable(location, inputFormat.getClass().getCanonicalName()); - } - - // ORC uses a custom InputFormat but is always splittable - if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) { - return true; - } - // use reflection to get isSplitable method on FileInputFormat - // ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable" - Method method = null; - for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) { - try { - method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class); - break; - } catch (NoSuchMethodException ignored) { - LOG.debug("Class {} doesn't contain isSplitable method.", clazz); - } + return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat); } - if (method == null) { - return false; - } - Path path = new Path(location); - try { - method.setAccessible(true); - return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path); - } catch (InvocationTargetException | IllegalAccessException | IOException e) { - throw new RuntimeException(e); - } + // All supported hive input format are splittable + return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat); } public static String getHivePartitionValue(String part) { @@ -236,5 +205,4 @@ public final class HiveUtil { throw new RuntimeException(e); } } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 78a0df2ee6d..8a6864aba3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -33,9 +33,17 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.thrift.TExprOpcode; +import com.google.common.collect.Lists; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -45,19 +53,17 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Locale; /** * Iceberg utils */ public class IcebergUtils { private static final Logger LOG = LogManager.getLogger(IcebergUtils.class); - private static ThreadLocal<Integer> columnIdThreadLocal = new ThreadLocal<Integer>() { - @Override - public Integer initialValue() { - return 0; - } - }; - static long MILLIS_TO_NANO_TIME = 1000; + private static long MILLIS_TO_NANO_TIME = 1000; + // https://iceberg.apache.org/spec/#schemas-and-data-types + // All time and timestamp values are stored with microsecond precision + private static final int ICEBERG_DATETIME_SCALE_MS = 6; public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { @@ -238,4 +244,74 @@ public class IcebergUtils { } return slotRef; } + + private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) { + switch (primitive.typeId()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case LONG: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case STRING: + case BINARY: + case UUID: + return Type.STRING; + case FIXED: + Types.FixedType fixed = (Types.FixedType) primitive; + return ScalarType.createCharType(fixed.length()); + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP: + return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS); + case TIME: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + primitive); + } + } + + public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) { + if (type.isPrimitiveType()) { + return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type); + } + switch (type.typeId()) { + case LIST: + Types.ListType list = (Types.ListType) type; + return ArrayType.create(icebergTypeToDorisType(list.elementType()), true); + case MAP: + case STRUCT: + return Type.UNSUPPORTED; + default: + throw new IllegalArgumentException("Cannot transform unknown type: " + type); + } + } + + /** + * Get iceberg schema from catalog and convert them to doris schema + */ + public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name) { + return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { + org.apache.iceberg.Table icebergTable = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, dbName, name); + Schema schema = icebergTable.schema(); + List<Types.NestedField> columns = schema.columns(); + List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size()); + for (Types.NestedField field : columns) { + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), + IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, + schema.caseInsensitiveFindField(field.name()).fieldId())); + } + return tmpSchema; + }); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 1a39f729738..9b5added1b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -148,6 +148,10 @@ public class MTMVTask extends AbstractTask { LOG.info("mtmv task run, taskId: {}", super.getTaskId()); try { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); + if (LOG.isDebugEnabled()) { + String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString(); + LOG.debug("mtmv task session variable, taskId: {}, session: {}", super.getTaskId(), taskSessionContext); + } // Every time a task is run, the relation is regenerated because baseTables and baseViews may change, // such as deleting a table and creating a view with the same name this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java index 73ac8ed7b39..6eed310927c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java @@ -24,7 +24,6 @@ import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; -import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; @@ -48,11 +47,9 @@ public class IcebergApiSource implements IcebergSource { this.icebergExtTable = table; this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable( - ((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(), - icebergExtTable.getCatalog().getId(), - icebergExtTable.getDbName(), - icebergExtTable.getName(), - icebergExtTable.getCatalog().getProperties()); + icebergExtTable.getCatalog(), + icebergExtTable.getDbName(), + icebergExtTable.getName()); this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java index 478e78c0d0e..62c96930f00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java @@ -47,7 +47,9 @@ public class IcebergHMSSource implements IcebergSource { this.desc = desc; this.columnNameToRange = columnNameToRange; this.icebergTable = - Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable); + Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getIcebergTable(hmsTable.getCatalog(), + hmsTable.getDbName(), hmsTable.getName()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java index 91a208202d0..9f273ca6305 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java @@ -19,9 +19,7 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; -import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; -import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; @@ -72,76 +70,44 @@ public class IcebergMetadataCache { return ifPresent; } - Table icebergTable = getIcebergTable(key, catalog, params.getDatabase(), params.getTable()); + Table icebergTable = getIcebergTable(catalog, params.getDatabase(), params.getTable()); List<Snapshot> snaps = Lists.newArrayList(); Iterables.addAll(snaps, icebergTable.snapshots()); snapshotListCache.put(key, snaps); return snaps; } - public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf catalog, String dbName, String tbName) - throws UserException { + public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName); Table cacheTable = tableCache.getIfPresent(key); if (cacheTable != null) { return cacheTable; } - Table icebergTable; + Catalog icebergCatalog; if (catalog instanceof HMSExternalCatalog) { HMSExternalCatalog ctg = (HMSExternalCatalog) catalog; - icebergTable = createIcebergTable( - ctg.getHiveMetastoreUris(), - ctg.getCatalogProperty().getHadoopProperties(), - dbName, - tbName, - ctg.getProperties()); + icebergCatalog = createIcebergHiveCatalog( + ctg.getHiveMetastoreUris(), + ctg.getCatalogProperty().getHadoopProperties(), + ctg.getProperties()); } else if (catalog instanceof IcebergExternalCatalog) { - IcebergExternalCatalog extCatalog = (IcebergExternalCatalog) catalog; - icebergTable = getIcebergTable( - extCatalog.getCatalog(), extCatalog.getId(), dbName, tbName, extCatalog.getProperties()); + icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog(); } else { - throw new UserException("Only support 'hms' and 'iceberg' type for iceberg table"); - } - tableCache.put(key, icebergTable); - return icebergTable; - } - - public Table getIcebergTable(IcebergSource icebergSource) throws MetaNotFoundException { - return icebergSource.getIcebergTable(); - } - - public Table getIcebergTable(HMSExternalTable hmsTable) { - IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of( - hmsTable.getCatalog().getId(), - hmsTable.getDbName(), - hmsTable.getName()); - Table table = tableCache.getIfPresent(key); - if (table != null) { - return table; + throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table"); } - Table icebergTable = createIcebergTable(hmsTable); + Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(), + () -> icebergCatalog.loadTable(TableIdentifier.of(dbName, tbName))); + initIcebergTableFileIO(icebergTable, catalog.getProperties()); tableCache.put(key, icebergTable); - return icebergTable; } - public Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName, + private Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName, Map<String, String> props) { - IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of( - catalogId, - dbName, - tbName); - Table cacheTable = tableCache.getIfPresent(key); - if (cacheTable != null) { - return cacheTable; - } - Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId, () -> catalog.loadTable(TableIdentifier.of(dbName, tbName))); initIcebergTableFileIO(table, props); - - tableCache.put(key, table); - return table; } @@ -190,14 +156,12 @@ public class IcebergMetadataCache { }); } - private Table createIcebergTable(String uri, Map<String, String> hdfsConf, String db, String tbl, - Map<String, String> props) { + private Catalog createIcebergHiveCatalog(String uri, Map<String, String> hdfsConf, Map<String, String> props) { // set hdfs configure Configuration conf = new HdfsConfiguration(); for (Map.Entry<String, String> entry : hdfsConf.entrySet()) { conf.set(entry.getKey(), entry.getValue()); } - HiveCatalog hiveCatalog = new HiveCatalog(); hiveCatalog.setConf(conf); @@ -211,20 +175,10 @@ public class IcebergMetadataCache { catalogProperties.put("uri", uri); hiveCatalog.initialize("hive", catalogProperties); } - Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> hiveCatalog.loadTable(TableIdentifier.of(db, tbl))); - initIcebergTableFileIO(table, props); - return table; - } - - private Table createIcebergTable(HMSExternalTable hmsTable) { - return createIcebergTable(hmsTable.getMetastoreUri(), - hmsTable.getHadoopProperties(), - hmsTable.getDbName(), - hmsTable.getName(), - hmsTable.getCatalogProperties()); + return hiveCatalog; } - private void initIcebergTableFileIO(Table table, Map<String, String> props) { + private static void initIcebergTableFileIO(Table table, Map<String, String> props) { Map<String, String> ioConf = new HashMap<>(); table.properties().forEach((key, value) -> { if (key.startsWith("io.")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index a335ccfa021..18b745e402a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.TableIf; @@ -128,7 +127,7 @@ public class IcebergScanNode extends FileQueryScanNode { @Override protected void doInitialize() throws UserException { - icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(source); + icebergTable = source.getIcebergTable(); super.doInitialize(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 40ea594819c..65c4800c585 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -609,7 +609,7 @@ public class StatisticsUtil { Table icebergTable = Env.getCurrentEnv() .getExtMetaCacheMgr() .getIcebergMetadataCache() - .getIcebergTable(table); + .getIcebergTable(table.getCatalog(), table.getDbName(), table.getName()); TableScan tableScan = icebergTable.newScan().includeColumnStats(); for (FileScanTask task : tableScan.planFiles()) { rowCount += task.file().recordCount(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org