This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 9db3f5b6955 [refactor](hudi, iceberg) optimize some code #42636 
(#44148)
9db3f5b6955 is described below

commit 9db3f5b6955f2b997e1d5ab5b663de1411e5eb72
Author: Mingyu Chen (Rayner) <morning...@163.com>
AuthorDate: Tue Nov 19 13:55:02 2024 +0800

    [refactor](hudi, iceberg) optimize some code #42636 (#44148)
    
    cherry pick from #42636
---
 .../apache/doris/datasource/hudi/HudiUtils.java    | 242 +++++++++++----------
 .../doris/datasource/hudi/source/HudiScanNode.java |   2 +-
 .../iceberg/dlf/client/DLFCachedClientPool.java    |  27 ++-
 3 files changed, 153 insertions(+), 118 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
index 3885f1de3ee..d7803b1a516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/HudiUtils.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 
-import com.google.common.base.Preconditions;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -41,30 +40,58 @@ import java.util.stream.Collectors;
 public class HudiUtils {
     private static final SimpleDateFormat defaultDateFormat = new 
SimpleDateFormat("yyyy-MM-dd");
 
-    public static String fromAvroHudiTypeToHiveTypeString(Schema avroSchema) {
-        Schema.Type columnType = avroSchema.getType();
-        LogicalType logicalType = avroSchema.getLogicalType();
-        switch (columnType) {
+    /**
+     * Convert different query instant time format to the commit time format.
+     * Currently we support three kinds of instant time format for time travel 
query:
+     * 1、yyyy-MM-dd HH:mm:ss
+     * 2、yyyy-MM-dd
+     * This will convert to 'yyyyMMdd000000'.
+     * 3、yyyyMMddHHmmss
+     */
+    public static String formatQueryInstant(String queryInstant) throws 
ParseException {
+        int instantLength = queryInstant.length();
+        if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd 
HH:mm:ss[.SSS]
+            if (instantLength == 19) {
+                queryInstant += ".000";
+            }
+            return 
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
+        } else if (instantLength == 
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
+                || instantLength == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for 
yyyyMMddHHmmss[SSS]
+            HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // 
validate the format
+            return queryInstant;
+        } else if (instantLength == 10) { // for yyyy-MM-dd
+            return 
HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant));
+        } else {
+            throw new IllegalArgumentException("Unsupported query instant time 
format: " + queryInstant
+                    + ", Supported time format are: 'yyyy-MM-dd 
HH:mm:ss[.SSS]' "
+                    + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'");
+        }
+    }
+
+    public static String convertAvroToHiveType(Schema schema) {
+        Schema.Type type = schema.getType();
+        LogicalType logicalType = schema.getLogicalType();
+
+        switch (type) {
             case BOOLEAN:
                 return "boolean";
             case INT:
                 if (logicalType instanceof LogicalTypes.Date) {
                     return "date";
-                } else if (logicalType instanceof LogicalTypes.TimeMillis) {
-                    break;
-                } else {
-                    return "int";
                 }
+                if (logicalType instanceof LogicalTypes.TimeMillis) {
+                    return handleUnsupportedType(schema);
+                }
+                return "int";
             case LONG:
+                if (logicalType instanceof LogicalTypes.TimestampMillis
+                        || logicalType instanceof 
LogicalTypes.TimestampMicros) {
+                    return logicalType.getName();
+                }
                 if (logicalType instanceof LogicalTypes.TimeMicros) {
-                    break;
-                } else if (logicalType instanceof 
LogicalTypes.TimestampMillis) {
-                    return "timestamp(3)";
-                } else if (logicalType instanceof 
LogicalTypes.TimestampMicros) {
-                    return "timestamp(6)";
-                } else {
-                    return "bigint";
+                    return handleUnsupportedType(schema);
                 }
+                return "bigint";
             case FLOAT:
                 return "float";
             case DOUBLE:
@@ -74,71 +101,57 @@ public class HudiUtils {
             case FIXED:
             case BYTES:
                 if (logicalType instanceof LogicalTypes.Decimal) {
-                    int precision = ((LogicalTypes.Decimal) 
logicalType).getPrecision();
-                    int scale = ((LogicalTypes.Decimal) 
logicalType).getScale();
-                    return String.format("decimal(%s,%s)", precision, scale);
-                } else {
-                    if (columnType == Schema.Type.BYTES) {
-                        return "binary";
-                    }
-                    return "string";
+                    LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
logicalType;
+                    return String.format("decimal(%d,%d)", 
decimalType.getPrecision(), decimalType.getScale());
                 }
+                return "string";
             case ARRAY:
-                String elementType = 
fromAvroHudiTypeToHiveTypeString(avroSchema.getElementType());
-                return String.format("array<%s>", elementType);
+                String arrayElementType = 
convertAvroToHiveType(schema.getElementType());
+                return String.format("array<%s>", arrayElementType);
             case RECORD:
-                List<Field> fields = avroSchema.getFields();
-                Preconditions.checkArgument(fields.size() > 0);
-                String nameToType = fields.stream()
-                        .map(f -> String.format("%s:%s", f.name(),
-                                fromAvroHudiTypeToHiveTypeString(f.schema())))
+                List<Field> recordFields = schema.getFields();
+                if (recordFields.isEmpty()) {
+                    throw new IllegalArgumentException("Record must have 
fields");
+                }
+                String structFields = recordFields.stream()
+                        .map(field -> String.format("%s:%s", field.name(), 
convertAvroToHiveType(field.schema())))
                         .collect(Collectors.joining(","));
-                return String.format("struct<%s>", nameToType);
+                return String.format("struct<%s>", structFields);
             case MAP:
-                Schema value = avroSchema.getValueType();
-                String valueType = fromAvroHudiTypeToHiveTypeString(value);
-                return String.format("map<%s,%s>", "string", valueType);
+                Schema mapValueType = schema.getValueType();
+                String mapValueTypeString = 
convertAvroToHiveType(mapValueType);
+                return String.format("map<string,%s>", mapValueTypeString);
             case UNION:
-                List<Schema> nonNullMembers = avroSchema.getTypes().stream()
-                        .filter(schema -> 
!Schema.Type.NULL.equals(schema.getType()))
+                List<Schema> unionTypes = schema.getTypes().stream()
+                        .filter(s -> s.getType() != Schema.Type.NULL)
                         .collect(Collectors.toList());
-                // The nullable column in hudi is the union type with schemas 
[null, real column type]
-                if (nonNullMembers.size() == 1) {
-                    return 
fromAvroHudiTypeToHiveTypeString(nonNullMembers.get(0));
+                if (unionTypes.size() == 1) {
+                    return convertAvroToHiveType(unionTypes.get(0));
                 }
                 break;
             default:
                 break;
         }
-        String errorMsg = String.format("Unsupported hudi %s type of column 
%s", avroSchema.getType().getName(),
-                avroSchema.getName());
-        throw new IllegalArgumentException(errorMsg);
+
+        throw new IllegalArgumentException(
+                String.format("Unsupported type: %s for column: %s", 
type.getName(), schema.getName()));
+    }
+
+    private static String handleUnsupportedType(Schema schema) {
+        throw new IllegalArgumentException(String.format("Unsupported logical 
type: %s", schema.getLogicalType()));
     }
 
     public static Type fromAvroHudiTypeToDorisType(Schema avroSchema) {
         Schema.Type columnType = avroSchema.getType();
         LogicalType logicalType = avroSchema.getLogicalType();
+
         switch (columnType) {
             case BOOLEAN:
                 return Type.BOOLEAN;
             case INT:
-                if (logicalType instanceof LogicalTypes.Date) {
-                    return ScalarType.createDateV2Type();
-                } else if (logicalType instanceof LogicalTypes.TimeMillis) {
-                    return ScalarType.createTimeV2Type(3);
-                } else {
-                    return Type.INT;
-                }
+                return handleIntType(logicalType);
             case LONG:
-                if (logicalType instanceof LogicalTypes.TimeMicros) {
-                    return ScalarType.createTimeV2Type(6);
-                } else if (logicalType instanceof 
LogicalTypes.TimestampMillis) {
-                    return ScalarType.createDatetimeV2Type(3);
-                } else if (logicalType instanceof 
LogicalTypes.TimestampMicros) {
-                    return ScalarType.createDatetimeV2Type(6);
-                } else {
-                    return Type.BIGINT;
-                }
+                return handleLongType(logicalType);
             case FLOAT:
                 return Type.FLOAT;
             case DOUBLE:
@@ -147,64 +160,75 @@ public class HudiUtils {
                 return Type.STRING;
             case FIXED:
             case BYTES:
-                if (logicalType instanceof LogicalTypes.Decimal) {
-                    int precision = ((LogicalTypes.Decimal) 
logicalType).getPrecision();
-                    int scale = ((LogicalTypes.Decimal) 
logicalType).getScale();
-                    return ScalarType.createDecimalV3Type(precision, scale);
-                } else {
-                    return Type.STRING;
-                }
+                return handleFixedOrBytesType(logicalType);
             case ARRAY:
-                Type innerType = 
fromAvroHudiTypeToDorisType(avroSchema.getElementType());
-                return ArrayType.create(innerType, true);
+                return handleArrayType(avroSchema);
             case RECORD:
-                ArrayList<StructField> fields = new ArrayList<>();
-                avroSchema.getFields().forEach(
-                        f -> fields.add(new StructField(f.name(), 
fromAvroHudiTypeToDorisType(f.schema()))));
-                return new StructType(fields);
+                return handleRecordType(avroSchema);
             case MAP:
-                // Hudi map's key must be string
-                return new MapType(Type.STRING, 
fromAvroHudiTypeToDorisType(avroSchema.getValueType()));
+                return handleMapType(avroSchema);
             case UNION:
-                List<Schema> nonNullMembers = avroSchema.getTypes().stream()
-                        .filter(schema -> 
!Schema.Type.NULL.equals(schema.getType()))
-                        .collect(Collectors.toList());
-                // The nullable column in hudi is the union type with schemas 
[null, real column type]
-                if (nonNullMembers.size() == 1) {
-                    return fromAvroHudiTypeToDorisType(nonNullMembers.get(0));
-                }
-                break;
+                return handleUnionType(avroSchema);
             default:
-                break;
+                return Type.UNSUPPORTED;
         }
-        return Type.UNSUPPORTED;
     }
 
-    /**
-     * Convert different query instant time format to the commit time format.
-     * Currently we support three kinds of instant time format for time travel 
query:
-     * 1、yyyy-MM-dd HH:mm:ss
-     * 2、yyyy-MM-dd
-     * This will convert to 'yyyyMMdd000000'.
-     * 3、yyyyMMddHHmmss
-     */
-    public static String formatQueryInstant(String queryInstant) throws 
ParseException {
-        int instantLength = queryInstant.length();
-        if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd 
HH:mm:ss[.SSS]
-            if (instantLength == 19) {
-                queryInstant += ".000";
-            }
-            return 
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant);
-        } else if (instantLength == 
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
-                || instantLength == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for 
yyyyMMddHHmmss[SSS]
-            HoodieActiveTimeline.parseDateFromInstantTime(queryInstant); // 
validate the format
-            return queryInstant;
-        } else if (instantLength == 10) { // for yyyy-MM-dd
-            return 
HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant));
-        } else {
-            throw new IllegalArgumentException("Unsupported query instant time 
format: " + queryInstant
-                    + ", Supported time format are: 'yyyy-MM-dd 
HH:mm:ss[.SSS]' "
-                    + "or 'yyyy-MM-dd' or 'yyyyMMddHHmmss[SSS]'");
+    private static Type handleIntType(LogicalType logicalType) {
+        if (logicalType instanceof LogicalTypes.Date) {
+            return ScalarType.createDateV2Type();
+        }
+        if (logicalType instanceof LogicalTypes.TimeMillis) {
+            return ScalarType.createTimeV2Type(3);
+        }
+        return Type.INT;
+    }
+
+    private static Type handleLongType(LogicalType logicalType) {
+        if (logicalType instanceof LogicalTypes.TimeMicros) {
+            return ScalarType.createTimeV2Type(6);
+        }
+        if (logicalType instanceof LogicalTypes.TimestampMillis) {
+            return ScalarType.createDatetimeV2Type(3);
         }
+        if (logicalType instanceof LogicalTypes.TimestampMicros) {
+            return ScalarType.createDatetimeV2Type(6);
+        }
+        return Type.BIGINT;
+    }
+
+    private static Type handleFixedOrBytesType(LogicalType logicalType) {
+        if (logicalType instanceof LogicalTypes.Decimal) {
+            int precision = ((LogicalTypes.Decimal) 
logicalType).getPrecision();
+            int scale = ((LogicalTypes.Decimal) logicalType).getScale();
+            return ScalarType.createDecimalV3Type(precision, scale);
+        }
+        return Type.STRING;
+    }
+
+    private static Type handleArrayType(Schema avroSchema) {
+        Type innerType = 
fromAvroHudiTypeToDorisType(avroSchema.getElementType());
+        return ArrayType.create(innerType, true);
+    }
+
+    private static Type handleRecordType(Schema avroSchema) {
+        ArrayList<StructField> fields = new ArrayList<>();
+        avroSchema.getFields().forEach(
+                f -> fields.add(new StructField(f.name(), 
fromAvroHudiTypeToDorisType(f.schema()))));
+        return new StructType(fields);
+    }
+
+    private static Type handleMapType(Schema avroSchema) {
+        return new MapType(Type.STRING, 
fromAvroHudiTypeToDorisType(avroSchema.getValueType()));
+    }
+
+    private static Type handleUnionType(Schema avroSchema) {
+        List<Schema> nonNullMembers = avroSchema.getTypes().stream()
+                .filter(schema -> !Schema.Type.NULL.equals(schema.getType()))
+                .collect(Collectors.toList());
+        if (nonNullMembers.size() == 1) {
+            return fromAvroHudiTypeToDorisType(nonNullMembers.get(0));
+        }
+        return Type.UNSUPPORTED;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index abd5a377f5a..a8f2a362bfd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -176,7 +176,7 @@ public class HudiScanNode extends HiveScanNode {
         }
         for (Schema.Field hudiField : hudiSchema.getFields()) {
             columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
-            String columnType = 
HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
+            String columnType = 
HudiUtils.convertAvroToHiveType(hudiField.schema());
             columnTypes.add(columnType);
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java
index f8e70ebd3f5..23b814c13b8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/dlf/client/DLFCachedClientPool.java
@@ -41,22 +41,32 @@ public class DLFCachedClientPool implements 
ClientPool<IMetaStoreClient, TExcept
     public DLFCachedClientPool(Configuration conf, Map<String, String> 
properties) {
         this.conf = conf;
         this.endpoint = conf.get("", "");
-        this.clientPoolSize =
-            PropertyUtil.propertyAsInt(
+        this.clientPoolSize = getClientPoolSize(properties);
+        this.evictionInterval = getEvictionInterval(properties);
+        initializeClientPoolCache();
+    }
+
+    private int getClientPoolSize(Map<String, String> properties) {
+        return PropertyUtil.propertyAsInt(
                 properties,
                 CatalogProperties.CLIENT_POOL_SIZE,
-                CatalogProperties.CLIENT_POOL_SIZE_DEFAULT);
-        this.evictionInterval =
-            PropertyUtil.propertyAsLong(
+                CatalogProperties.CLIENT_POOL_SIZE_DEFAULT
+        );
+    }
+
+    private long getEvictionInterval(Map<String, String> properties) {
+        return PropertyUtil.propertyAsLong(
                 properties,
                 CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
-                
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
+                
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT
+        );
+    }
 
+    private void initializeClientPoolCache() {
         if (clientPoolCache == null) {
             synchronized (clientPoolCacheLock) {
                 if (clientPoolCache == null) {
-                    clientPoolCache =
-                        Caffeine.newBuilder()
+                    clientPoolCache = Caffeine.newBuilder()
                             .expireAfterAccess(evictionInterval, 
TimeUnit.MILLISECONDS)
                             .removalListener((key, value, cause) -> 
((DLFClientPool) value).close())
                             .build();
@@ -80,3 +90,4 @@ public class DLFCachedClientPool implements 
ClientPool<IMetaStoreClient, TExcept
         return clientPool().run(action, retry);
     }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to