This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5593731566e [fix][refactor] refactor schema init of externa table and 
some parquet issue (#30325)
5593731566e is described below

commit 5593731566e8fb1ad2a554bc5ec1c48fb8f994ba
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Wed Jan 31 18:33:30 2024 +0800

    [fix][refactor] refactor schema init of externa table and some parquet 
issue (#30325)
    
    1. Skip parquet file which has only 4 bytes length: PAR1
    2. Refactor the schema init method of iceberg/hudi/hive table in hms catalog
        1. Remove some redundant methods of `getIcebergTable`
        2. Fix issue described in #23771
    3. Support HoodieParquetInputFormatBase, treat it as normal hive table 
format
    4. When listing file, skip all hidden dirs and files
---
 .gitignore                                         |  2 +
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  9 ++-
 .../doris/catalog/HiveMetaStoreClientHelper.java   |  2 +-
 .../doris/catalog/external/HMSExternalTable.java   | 73 +++++++++---------
 .../catalog/external/IcebergExternalTable.java     | 76 +-----------------
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 18 ++---
 .../datasource/iceberg/IcebergExternalCatalog.java |  2 +-
 .../apache/doris/external/hive/util/HiveUtil.java  | 44 ++---------
 .../doris/external/iceberg/util/IcebergUtils.java  | 90 ++++++++++++++++++++--
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  4 +
 .../planner/external/iceberg/IcebergApiSource.java |  9 +--
 .../planner/external/iceberg/IcebergHMSSource.java |  4 +-
 .../external/iceberg/IcebergMetadataCache.java     | 80 ++++---------------
 .../planner/external/iceberg/IcebergScanNode.java  |  3 +-
 .../doris/statistics/util/StatisticsUtil.java      |  2 +-
 15 files changed, 175 insertions(+), 243 deletions(-)

diff --git a/.gitignore b/.gitignore
index a8ad35b55d5..2513a796189 100644
--- a/.gitignore
+++ b/.gitignore
@@ -54,6 +54,8 @@ thirdparty/installed*
 thirdparty/doris-thirdparty*.tar.xz
 
 docker/thirdparties/docker-compose/mysql/data
+docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
+docker/thirdparties/docker-compose/hive/scripts/paimon1
 
 fe_plugins/output
 fe_plugins/**/.factorypath
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6ae4ea2f5bc..dd8d01f4bb1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -238,8 +238,11 @@ Status ParquetReader::_open_file() {
     }
     if (_file_metadata == nullptr) {
         SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
-        if (_file_reader->size() == 0) {
-            return Status::EndOfFile("open file failed, empty parquet file: " 
+ _scan_range.path);
+        if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
+            // Some system may generate parquet file with only 4 bytes: PAR1
+            // Should consider it as empty file.
+            return Status::EndOfFile("open file failed, empty parquet file {} 
with size: {}",
+                                     _scan_range.path, _file_reader->size());
         }
         size_t meta_size = 0;
         if (_meta_cache == nullptr) {
@@ -928,4 +931,4 @@ int64_t ParquetReader::_get_column_start_offset(const 
tparquet::ColumnMetaData&
     }
     return column.data_page_offset;
 }
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 96419cd0b0a..7d0419fcbf9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -770,7 +770,7 @@ public class HiveMetaStoreClientHelper {
         try {
             hudiSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
         } catch (Exception e) {
-            throw new RuntimeException("Cannot get hudi table schema.");
+            throw new RuntimeException("Cannot get hudi table schema.", e);
         }
         return hudiSchema;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index e1037ffd025..028928eafce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSCachedClient;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.statistics.AnalysisInfo;
@@ -62,8 +63,6 @@ import 
org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -87,8 +86,10 @@ import java.util.stream.Collectors;
 public class HMSExternalTable extends ExternalTable implements 
MTMVRelatedTableIf {
     private static final Logger LOG = 
LogManager.getLogger(HMSExternalTable.class);
 
-    private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
-    private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
+    public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
+    public static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
+    public static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
+
     private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS;
     private static final String TBL_PROP_TXN_PROPERTIES = 
"transactional_properties";
     private static final String TBL_PROP_INSERT_ONLY = "insert_only";
@@ -111,13 +112,16 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
         
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
         
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.mapred.TextInputFormat");
+        // Some hudi table use HoodieParquetInputFormatBase as input format
+        // But we can't treat it as hudi table.
+        // So add to SUPPORTED_HIVE_FILE_FORMATS and treat is as a hive table.
+        // Then Doris will just list the files from location and read parquet 
files directly.
+        
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormatBase");
 
         SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS = Sets.newHashSet();
         
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
     }
 
-    private static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
-
     static {
         SUPPORTED_HUDI_FILE_FORMATS = Sets.newHashSet();
         
SUPPORTED_HUDI_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormat");
@@ -405,10 +409,6 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
     }
 
-    public String getHiveVersion() {
-        return ((HMSExternalCatalog) catalog).getHiveVersion();
-    }
-
     public Map<String, String> getCatalogProperties() {
         return catalog.getProperties();
     }
@@ -454,32 +454,28 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return initSchema();
     }
 
-
     @Override
     public List<Column> initSchema() {
         makeSureInitialized();
         List<Column> columns;
-        List<FieldSchema> schema = ((HMSExternalCatalog) 
catalog).getClient().getSchema(dbName, name);
         if (dlaType.equals(DLAType.ICEBERG)) {
-            columns = getIcebergSchema(schema);
+            columns = getIcebergSchema();
         } else if (dlaType.equals(DLAType.HUDI)) {
-            columns = getHudiSchema(schema);
+            columns = getHudiSchema();
         } else {
-            List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(schema.size());
-            for (FieldSchema field : schema) {
-                tmpSchema.add(new 
Column(field.getName().toLowerCase(Locale.ROOT),
-                        
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
-                        true, field.getComment(), true, -1));
-            }
-            columns = tmpSchema;
+            columns = getHiveSchema();
         }
         initPartitionColumns(columns);
         return columns;
     }
 
-    public List<Column> getHudiSchema(List<FieldSchema> hmsSchema) {
+    private List<Column> getIcebergSchema() {
+        return IcebergUtils.getSchema(catalog, dbName, name);
+    }
+
+    private List<Column> getHudiSchema() {
         org.apache.avro.Schema hudiSchema = 
HiveMetaStoreClientHelper.getHudiTableSchema(this);
-        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(hmsSchema.size());
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
         for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
             String columnName = hudiField.name().toLowerCase(Locale.ROOT);
             tmpSchema.add(new Column(columnName, 
HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
@@ -488,6 +484,19 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return tmpSchema;
     }
 
+    private List<Column> getHiveSchema() {
+        List<Column> columns;
+        List<FieldSchema> schema = ((HMSExternalCatalog) 
catalog).getClient().getSchema(dbName, name);
+        List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
+        for (FieldSchema field : schema) {
+            tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
+                    
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+                    true, field.getComment(), true, -1));
+        }
+        columns = tmpSchema;
+        return columns;
+    }
+
     @Override
     public long getCacheRowCount() {
         //Cached accurate information
@@ -528,20 +537,6 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return 1;
     }
 
-    private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
-        Table icebergTable = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this);
-        Schema schema = icebergTable.schema();
-        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(hmsSchema.size());
-        for (FieldSchema field : hmsSchema) {
-            tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
-                    
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
-                            IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
-                    true, null, true, false, null, field.getComment(), true, 
null,
-                    
schema.caseInsensitiveFindField(field.getName()).fieldId(), null));
-        }
-        return tmpSchema;
-    }
-
     private void initPartitionColumns(List<Column> schema) {
         List<String> partitionKeys = 
remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
                 .collect(Collectors.toList());
@@ -598,7 +593,9 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
                 return getHiveColumnStats(colName);
             case ICEBERG:
                 return StatisticsUtil.getIcebergColumnStats(colName,
-                        
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
+                        
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
+                                catalog, dbName, name
+                        ));
             default:
                 LOG.warn("get column stats for dlaType {} is not supported.", 
dlaType);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index be99e26de62..be320fc9268 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -17,12 +17,10 @@
 
 package org.apache.doris.catalog.external;
 
-import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ColumnStatistic;
@@ -33,21 +31,11 @@ import org.apache.doris.thrift.TIcebergTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
-import com.google.common.collect.Lists;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.types.Types;
-
 import java.util.HashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Optional;
 
 public class IcebergExternalTable extends ExternalTable {
-
-    // https://iceberg.apache.org/spec/#schemas-and-data-types
-    // All time and timestamp values are stored with microsecond precision
-    public static final int ICEBERG_DATETIME_SCALE_MS = 6;
-
     public IcebergExternalTable(long id, String name, String dbName, 
IcebergExternalCatalog catalog) {
         super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE);
     }
@@ -65,66 +53,8 @@ public class IcebergExternalTable extends ExternalTable {
 
     @Override
     public List<Column> initSchema() {
-        return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), 
() -> {
-            Schema schema = ((IcebergExternalCatalog) 
catalog).getIcebergTable(dbName, name).schema();
-            List<Types.NestedField> columns = schema.columns();
-            List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
-            for (Types.NestedField field : columns) {
-                tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
-                        icebergTypeToDorisType(field.type()), true, null, 
true, field.doc(), true,
-                        
schema.caseInsensitiveFindField(field.name()).fieldId()));
-            }
-            return tmpSchema;
-        });
-    }
-
-    private Type 
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType 
primitive) {
-        switch (primitive.typeId()) {
-            case BOOLEAN:
-                return Type.BOOLEAN;
-            case INTEGER:
-                return Type.INT;
-            case LONG:
-                return Type.BIGINT;
-            case FLOAT:
-                return Type.FLOAT;
-            case DOUBLE:
-                return Type.DOUBLE;
-            case STRING:
-            case BINARY:
-            case UUID:
-                return Type.STRING;
-            case FIXED:
-                Types.FixedType fixed = (Types.FixedType) primitive;
-                return ScalarType.createCharType(fixed.length());
-            case DECIMAL:
-                Types.DecimalType decimal = (Types.DecimalType) primitive;
-                return ScalarType.createDecimalV3Type(decimal.precision(), 
decimal.scale());
-            case DATE:
-                return ScalarType.createDateV2Type();
-            case TIMESTAMP:
-                return 
ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
-            case TIME:
-                return Type.UNSUPPORTED;
-            default:
-                throw new IllegalArgumentException("Cannot transform unknown 
type: " + primitive);
-        }
-    }
-
-    protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
-        if (type.isPrimitiveType()) {
-            return 
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) 
type);
-        }
-        switch (type.typeId()) {
-            case LIST:
-                Types.ListType list = (Types.ListType) type;
-                return 
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
-            case MAP:
-            case STRUCT:
-                return Type.UNSUPPORTED;
-            default:
-                throw new IllegalArgumentException("Cannot transform unknown 
type: " + type);
-        }
+        makeSureInitialized();
+        return IcebergUtils.getSchema(catalog, dbName, name);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 56fffc41ddd..ebe8d692c75 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -358,14 +358,14 @@ public class HiveMetaStoreCache {
     }
 
     // Get File Status by using FileSystem API.
-    private FileCacheValue getFileCache(String location, InputFormat<?, ?> 
inputFormat,
-                                        JobConf jobConf,
-                                        List<String> partitionValues,
-                                        String bindBrokerName) throws 
UserException {
+    private FileCacheValue getFileCache(String location, String inputFormat,
+            JobConf jobConf,
+            List<String> partitionValues,
+            String bindBrokerName) throws UserException {
         FileCacheValue result = new FileCacheValue();
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
-                    location, bindBrokerName), jobConf, bindBrokerName));
+                        location, bindBrokerName), jobConf, bindBrokerName));
         result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, 
jobConf));
         try {
             // For Tez engine, it may generate subdirectoies for "union" query.
@@ -425,12 +425,12 @@ public class HiveMetaStoreCache {
             FileInputFormat.setInputPaths(jobConf, finalLocation.get());
             try {
                 FileCacheValue result;
-                InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    result = getFileCache(finalLocation.get(), inputFormat, 
jobConf,
-                        key.getPartitionValues(), key.bindBrokerName);
+                    result = getFileCache(finalLocation.get(), 
key.inputFormat, jobConf,
+                            key.getPartitionValues(), key.bindBrokerName);
                 } else {
+                    InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                     InputSplit[] splits;
                     String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
                     if (!Strings.isNullOrEmpty(remoteUser)) {
@@ -1082,7 +1082,7 @@ public class HiveMetaStoreCache {
         private static boolean isGeneratedPath(String name) {
             return "_temporary".equals(name) // generated by spark
                     || "_imapala_insert_staging".equals(name) // generated by 
impala
-                    || name.startsWith(".hive-staging"); // generated by hive
+                    || name.startsWith("."); // generated by hive or hidden 
file
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index c8ff468ab29..fadc60913be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -106,6 +106,6 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         return Env.getCurrentEnv()
                 .getExtMetaCacheMgr()
                 .getIcebergMetadataCache()
-                .getIcebergTable(catalog, id, dbName, tblName, 
getProperties());
+                .getIcebergTable(this, dbName, tblName);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index deb048b5943..7ed44620a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -21,16 +21,14 @@ import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.BrokerFileSystem;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
@@ -46,10 +44,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -134,7 +129,6 @@ public final class HiveUtil {
     }
 
     private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
-
         switch (hiveTypeInfo.getCategory()) {
             case PRIMITIVE: {
                 PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) 
hiveTypeInfo;
@@ -190,39 +184,14 @@ public final class HiveUtil {
         }
     }
 
-    public static boolean isSplittable(RemoteFileSystem remoteFileSystem, 
InputFormat<?, ?> inputFormat,
-                                       String location, JobConf jobConf) 
throws UserException {
+    public static boolean isSplittable(RemoteFileSystem remoteFileSystem, 
String inputFormat,
+            String location, JobConf jobConf) throws UserException {
         if (remoteFileSystem instanceof BrokerFileSystem) {
-            return ((BrokerFileSystem) remoteFileSystem)
-                    .isSplittable(location, 
inputFormat.getClass().getCanonicalName());
-        }
-
-        // ORC uses a custom InputFormat but is always splittable
-        if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
-            return true;
-        }
-        // use reflection to get isSplitable method on FileInputFormat
-        // ATTN: the method name is actually "isSplitable", but the right 
spell is "isSplittable"
-        Method method = null;
-        for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = 
clazz.getSuperclass()) {
-            try {
-                method = clazz.getDeclaredMethod("isSplitable", 
FileSystem.class, Path.class);
-                break;
-            } catch (NoSuchMethodException ignored) {
-                LOG.debug("Class {} doesn't contain isSplitable method.", 
clazz);
-            }
+            return ((BrokerFileSystem) 
remoteFileSystem).isSplittable(location, inputFormat);
         }
 
-        if (method == null) {
-            return false;
-        }
-        Path path = new Path(location);
-        try {
-            method.setAccessible(true);
-            return (boolean) method.invoke(inputFormat, 
FileSystemFactory.getNativeByPath(path, jobConf), path);
-        } catch (InvocationTargetException | IllegalAccessException | 
IOException e) {
-            throw new RuntimeException(e);
-        }
+        // All supported hive input format are splittable
+        return 
HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
     }
 
     public static String getHivePartitionValue(String part) {
@@ -236,5 +205,4 @@ public final class HiveUtil {
             throw new RuntimeException(e);
         }
     }
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 78a0df2ee6d..8a6864aba3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -33,9 +33,17 @@ import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.Subquery;
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.thrift.TExprOpcode;
 
+import com.google.common.collect.Lists;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -45,19 +53,17 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 
 /**
  * Iceberg utils
  */
 public class IcebergUtils {
     private static final Logger LOG = LogManager.getLogger(IcebergUtils.class);
-    private static ThreadLocal<Integer> columnIdThreadLocal = new 
ThreadLocal<Integer>() {
-        @Override
-        public Integer initialValue() {
-            return 0;
-        }
-    };
-    static long MILLIS_TO_NANO_TIME = 1000;
+    private static long MILLIS_TO_NANO_TIME = 1000;
+    // https://iceberg.apache.org/spec/#schemas-and-data-types
+    // All time and timestamp values are stored with microsecond precision
+    private static final int ICEBERG_DATETIME_SCALE_MS = 6;
 
     public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
@@ -238,4 +244,74 @@ public class IcebergUtils {
         }
         return slotRef;
     }
+
+    private static Type 
icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType 
primitive) {
+        switch (primitive.typeId()) {
+            case BOOLEAN:
+                return Type.BOOLEAN;
+            case INTEGER:
+                return Type.INT;
+            case LONG:
+                return Type.BIGINT;
+            case FLOAT:
+                return Type.FLOAT;
+            case DOUBLE:
+                return Type.DOUBLE;
+            case STRING:
+            case BINARY:
+            case UUID:
+                return Type.STRING;
+            case FIXED:
+                Types.FixedType fixed = (Types.FixedType) primitive;
+                return ScalarType.createCharType(fixed.length());
+            case DECIMAL:
+                Types.DecimalType decimal = (Types.DecimalType) primitive;
+                return ScalarType.createDecimalV3Type(decimal.precision(), 
decimal.scale());
+            case DATE:
+                return ScalarType.createDateV2Type();
+            case TIMESTAMP:
+                return 
ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
+            case TIME:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown 
type: " + primitive);
+        }
+    }
+
+    public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type 
type) {
+        if (type.isPrimitiveType()) {
+            return 
icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) 
type);
+        }
+        switch (type.typeId()) {
+            case LIST:
+                Types.ListType list = (Types.ListType) type;
+                return 
ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
+            case MAP:
+            case STRUCT:
+                return Type.UNSUPPORTED;
+            default:
+                throw new IllegalArgumentException("Cannot transform unknown 
type: " + type);
+        }
+    }
+
+    /**
+     * Get iceberg schema from catalog and convert them to doris schema
+     */
+    public static List<Column> getSchema(ExternalCatalog catalog, String 
dbName, String name) {
+        return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), 
() -> {
+            org.apache.iceberg.Table icebergTable = Env.getCurrentEnv()
+                    .getExtMetaCacheMgr()
+                    .getIcebergMetadataCache()
+                    .getIcebergTable(catalog, dbName, name);
+            Schema schema = icebergTable.schema();
+            List<Types.NestedField> columns = schema.columns();
+            List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
+            for (Types.NestedField field : columns) {
+                tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+                        IcebergUtils.icebergTypeToDorisType(field.type()), 
true, null, true, field.doc(), true,
+                        
schema.caseInsensitiveFindField(field.name()).fieldId()));
+            }
+            return tmpSchema;
+        });
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 1a39f729738..9b5added1b9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -148,6 +148,10 @@ public class MTMVTask extends AbstractTask {
         LOG.info("mtmv task run, taskId: {}", super.getTaskId());
         try {
             ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
+            if (LOG.isDebugEnabled()) {
+                String taskSessionContext = 
ctx.getSessionVariable().toJson().toJSONString();
+                LOG.debug("mtmv task session variable, taskId: {}, session: 
{}", super.getTaskId(), taskSessionContext);
+            }
             // Every time a task is run, the relation is regenerated because 
baseTables and baseViews may change,
             // such as deleting a table and creating a view with the same name
             this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
index 73ac8ed7b39..6eed310927c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.external.IcebergExternalTable;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.thrift.TFileAttributes;
 
@@ -48,11 +47,9 @@ public class IcebergApiSource implements IcebergSource {
         this.icebergExtTable = table;
 
         this.originTable = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
-            ((IcebergExternalCatalog) 
icebergExtTable.getCatalog()).getCatalog(),
-            icebergExtTable.getCatalog().getId(),
-            icebergExtTable.getDbName(),
-            icebergExtTable.getName(),
-            icebergExtTable.getCatalog().getProperties());
+                icebergExtTable.getCatalog(),
+                icebergExtTable.getDbName(),
+                icebergExtTable.getName());
 
         this.desc = desc;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
index 478e78c0d0e..62c96930f00 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -47,7 +47,9 @@ public class IcebergHMSSource implements IcebergSource {
         this.desc = desc;
         this.columnNameToRange = columnNameToRange;
         this.icebergTable =
-            
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable);
+                
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
+                        .getIcebergTable(hmsTable.getCatalog(),
+                                hmsTable.getDbName(), hmsTable.getName());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
index 91a208202d0..9f273ca6305 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java
@@ -19,9 +19,7 @@ package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.HMSExternalCatalog;
@@ -72,76 +70,44 @@ public class IcebergMetadataCache {
             return ifPresent;
         }
 
-        Table icebergTable = getIcebergTable(key, catalog, 
params.getDatabase(), params.getTable());
+        Table icebergTable = getIcebergTable(catalog, params.getDatabase(), 
params.getTable());
         List<Snapshot> snaps = Lists.newArrayList();
         Iterables.addAll(snaps, icebergTable.snapshots());
         snapshotListCache.put(key, snaps);
         return snaps;
     }
 
-    public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf 
catalog, String dbName, String tbName)
-            throws UserException {
+    public Table getIcebergTable(CatalogIf catalog, String dbName, String 
tbName) {
+        IcebergMetadataCacheKey key = 
IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName);
         Table cacheTable = tableCache.getIfPresent(key);
         if (cacheTable != null) {
             return cacheTable;
         }
 
-        Table icebergTable;
+        Catalog icebergCatalog;
         if (catalog instanceof HMSExternalCatalog) {
             HMSExternalCatalog ctg = (HMSExternalCatalog) catalog;
-            icebergTable = createIcebergTable(
-                ctg.getHiveMetastoreUris(),
-                ctg.getCatalogProperty().getHadoopProperties(),
-                dbName,
-                tbName,
-                ctg.getProperties());
+            icebergCatalog = createIcebergHiveCatalog(
+                    ctg.getHiveMetastoreUris(),
+                    ctg.getCatalogProperty().getHadoopProperties(),
+                    ctg.getProperties());
         } else if (catalog instanceof IcebergExternalCatalog) {
-            IcebergExternalCatalog extCatalog = (IcebergExternalCatalog) 
catalog;
-            icebergTable = getIcebergTable(
-                extCatalog.getCatalog(), extCatalog.getId(), dbName, tbName, 
extCatalog.getProperties());
+            icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog();
         } else {
-            throw new UserException("Only support 'hms' and 'iceberg' type for 
iceberg table");
-        }
-        tableCache.put(key, icebergTable);
-        return icebergTable;
-    }
-
-    public Table getIcebergTable(IcebergSource icebergSource) throws 
MetaNotFoundException {
-        return icebergSource.getIcebergTable();
-    }
-
-    public Table getIcebergTable(HMSExternalTable hmsTable) {
-        IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
-                hmsTable.getCatalog().getId(),
-                hmsTable.getDbName(),
-                hmsTable.getName());
-        Table table = tableCache.getIfPresent(key);
-        if (table != null) {
-            return table;
+            throw new RuntimeException("Only support 'hms' and 'iceberg' type 
for iceberg table");
         }
-        Table icebergTable = createIcebergTable(hmsTable);
+        Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(),
+                () -> icebergCatalog.loadTable(TableIdentifier.of(dbName, 
tbName)));
+        initIcebergTableFileIO(icebergTable, catalog.getProperties());
         tableCache.put(key, icebergTable);
-
         return icebergTable;
     }
 
-    public Table getIcebergTable(Catalog catalog, long catalogId, String 
dbName, String tbName,
+    private Table getIcebergTable(Catalog catalog, long catalogId, String 
dbName, String tbName,
             Map<String, String> props) {
-        IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
-                catalogId,
-                dbName,
-                tbName);
-        Table cacheTable = tableCache.getIfPresent(key);
-        if (cacheTable != null) {
-            return cacheTable;
-        }
-
         Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId,
                 () -> catalog.loadTable(TableIdentifier.of(dbName, tbName)));
         initIcebergTableFileIO(table, props);
-
-        tableCache.put(key, table);
-
         return table;
     }
 
@@ -190,14 +156,12 @@ public class IcebergMetadataCache {
                 });
     }
 
-    private Table createIcebergTable(String uri, Map<String, String> hdfsConf, 
String db, String tbl,
-            Map<String, String> props) {
+    private Catalog createIcebergHiveCatalog(String uri, Map<String, String> 
hdfsConf, Map<String, String> props) {
         // set hdfs configure
         Configuration conf = new HdfsConfiguration();
         for (Map.Entry<String, String> entry : hdfsConf.entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
         }
-
         HiveCatalog hiveCatalog = new HiveCatalog();
         hiveCatalog.setConf(conf);
 
@@ -211,20 +175,10 @@ public class IcebergMetadataCache {
             catalogProperties.put("uri", uri);
             hiveCatalog.initialize("hive", catalogProperties);
         }
-        Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> 
hiveCatalog.loadTable(TableIdentifier.of(db, tbl)));
-        initIcebergTableFileIO(table, props);
-        return table;
-    }
-
-    private Table createIcebergTable(HMSExternalTable hmsTable) {
-        return createIcebergTable(hmsTable.getMetastoreUri(),
-            hmsTable.getHadoopProperties(),
-            hmsTable.getDbName(),
-            hmsTable.getName(),
-            hmsTable.getCatalogProperties());
+        return hiveCatalog;
     }
 
-    private void initIcebergTableFileIO(Table table, Map<String, String> 
props) {
+    private static void initIcebergTableFileIO(Table table, Map<String, 
String> props) {
         Map<String, String> ioConf = new HashMap<>();
         table.properties().forEach((key, value) -> {
             if (key.startsWith("io.")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index a335ccfa021..18b745e402a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.TableIf;
@@ -128,7 +127,7 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     @Override
     protected void doInitialize() throws UserException {
-        icebergTable = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(source);
+        icebergTable = source.getIcebergTable();
         super.doInitialize();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 40ea594819c..65c4800c585 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -609,7 +609,7 @@ public class StatisticsUtil {
             Table icebergTable = Env.getCurrentEnv()
                     .getExtMetaCacheMgr()
                     .getIcebergMetadataCache()
-                    .getIcebergTable(table);
+                    .getIcebergTable(table.getCatalog(), table.getDbName(), 
table.getName());
             TableScan tableScan = icebergTable.newScan().includeColumnStats();
             for (FileScanTask task : tableScan.planFiles()) {
                 rowCount += task.file().recordCount();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to