This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ae2a139f8b [opt](catalog) merge scan range to avoid too many splits  
(#38311)
2ae2a139f8b is described below

commit 2ae2a139f8b9803320044d5dc2d602ff749eaba4
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Aug 6 17:53:22 2024 +0800

    [opt](catalog) merge scan range to avoid too many splits  (#38311)
    
    PR #34032 introduce a new method to get splits batch by batch,
    but it removed a logic that BE will merge scan ranges to avoid too many
    scan ranges being scheduled.
    
    This PR mainly changes:
    1. Add scan range merging logic back.
    2. Change the default file split size from 8MB to 64MB, to avoid too
    many small split.
---
 be/src/pipeline/exec/file_scan_operator.cpp        |  6 ++-
 be/src/vec/exec/scan/split_source_connector.cpp    |  2 +-
 be/src/vec/exec/scan/split_source_connector.h      | 55 ++++++++++++++++++++--
 .../doris/analysis/AlterDatabaseQuotaStmt.java     |  2 +-
 .../org/apache/doris/analysis/CreateTableStmt.java |  2 +-
 .../org/apache/doris/analysis/OutFileClause.java   |  2 +-
 .../java/org/apache/doris/analysis/SetVar.java     | 18 +++++--
 .../org/apache/doris/common/util/ParseUtil.java    |  2 +-
 .../org/apache/doris/datasource/FileScanNode.java  | 26 +++++-----
 .../doris/datasource/hive/HiveMetaStoreCache.java  | 14 +-----
 .../doris/datasource/hive/source/HiveScanNode.java |  4 --
 .../datasource/iceberg/source/IcebergScanNode.java |  7 +--
 .../doris/datasource/tvf/source/TVFScanNode.java   |  4 +-
 .../trees/plans/commands/info/CreateTableInfo.java |  2 +-
 14 files changed, 95 insertions(+), 51 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 98cc91824f6..d73cfc405fd 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -73,11 +73,13 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* 
state,
             auto split_source = scan_range.split_source;
             RuntimeProfile::Counter* get_split_timer = 
ADD_TIMER(_runtime_profile, "GetSplitTime");
             _split_source = 
std::make_shared<vectorized::RemoteSplitSourceConnector>(
-                    state, get_split_timer, split_source.split_source_id, 
split_source.num_splits);
+                    state, get_split_timer, split_source.split_source_id, 
split_source.num_splits,
+                    _max_scanners);
         }
     }
     if (_split_source == nullptr) {
-        _split_source = 
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges);
+        _split_source =
+                
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, 
_max_scanners);
     }
     _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
     if (scan_ranges.size() > 0 &&
diff --git a/be/src/vec/exec/scan/split_source_connector.cpp 
b/be/src/vec/exec/scan/split_source_connector.cpp
index 6533ae2bfe0..478af522e76 100644
--- a/be/src/vec/exec/scan/split_source_connector.cpp
+++ b/be/src/vec/exec/scan/split_source_connector.cpp
@@ -60,7 +60,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, 
TFileRangeDesc* rang
             return Status::IOError<false>("Failed to get batch of split 
source: {}", e.what());
         }
         _last_batch = result.splits.empty();
-        _scan_ranges = result.splits;
+        _merge_ranges<TScanRangeLocations>(_scan_ranges, result.splits);
         _scan_index = 0;
         _range_index = 0;
     }
diff --git a/be/src/vec/exec/scan/split_source_connector.h 
b/be/src/vec/exec/scan/split_source_connector.h
index f62b45612bf..8f38cd4f17a 100644
--- a/be/src/vec/exec/scan/split_source_connector.h
+++ b/be/src/vec/exec/scan/split_source_connector.h
@@ -43,6 +43,49 @@ public:
     virtual int num_scan_ranges() = 0;
 
     virtual TFileScanRangeParams* get_params() = 0;
+
+protected:
+    template <typename T>
+    void _merge_ranges(std::vector<T>& merged_ranges, const std::vector<T>& 
scan_ranges) {
+        if (scan_ranges.size() <= _max_scanners) {
+            merged_ranges = scan_ranges;
+            return;
+        }
+
+        // There is no need for the number of scanners to exceed the number of 
threads in thread pool.
+        // scan_ranges is sorted by path(as well as partition path) in FE, so 
merge scan ranges in order.
+        // In the insert statement, reading data in partition order can reduce 
the memory usage of BE
+        // and prevent the generation of smaller tables.
+        merged_ranges.resize(_max_scanners);
+        int num_ranges = scan_ranges.size() / _max_scanners;
+        int num_add_one = scan_ranges.size() - num_ranges * _max_scanners;
+        int scan_index = 0;
+        int range_index = 0;
+        for (int i = 0; i < num_add_one; ++i) {
+            merged_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
+        }
+        for (int i = num_add_one; i < _max_scanners; ++i) {
+            merged_ranges[scan_index] = scan_ranges[range_index++];
+            auto& ranges =
+                    
merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+            for (int j = 0; j < num_ranges - 1; j++) {
+                auto& merged_ranges =
+                        
scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges;
+                ranges.insert(ranges.end(), merged_ranges.begin(), 
merged_ranges.end());
+            }
+        }
+        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << 
merged_ranges.size();
+    }
+
+protected:
+    int _max_scanners;
 };
 
 /**
@@ -59,8 +102,10 @@ private:
     int _range_index = 0;
 
 public:
-    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges)
-            : _scan_ranges(scan_ranges) {}
+    LocalSplitSourceConnector(const std::vector<TScanRangeParams>& 
scan_ranges, int max_scanners) {
+        _max_scanners = max_scanners;
+        _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges);
+    }
 
     Status get_next(bool* has_next, TFileRangeDesc* range) override;
 
@@ -98,11 +143,13 @@ private:
 
 public:
     RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* 
get_split_timer,
-                               int64 split_source_id, int num_splits)
+                               int64 split_source_id, int num_splits, int 
max_scanners)
             : _state(state),
               _get_split_timer(get_split_timer),
               _split_source_id(split_source_id),
-              _num_splits(num_splits) {}
+              _num_splits(num_splits) {
+        _max_scanners = max_scanners;
+    }
 
     Status get_next(bool* has_next, TFileRangeDesc* range) override;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
index 552b701d029..e832e193ab3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java
@@ -80,7 +80,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
         }
         if (quotaType == QuotaType.DATA) {
-            quota = ParseUtil.analyzeDataVolumn(quotaValue);
+            quota = ParseUtil.analyzeDataVolume(quotaValue);
         } else if (quotaType == QuotaType.REPLICA) {
             quota = ParseUtil.analyzeReplicaNumber(quotaValue);
         } else if (quotaType == QuotaType.TRANSACTION) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index a5b39228c17..fcdc38b9994 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -127,7 +127,7 @@ public class CreateTableStmt extends DdlStmt implements 
NotFallbackInParser {
             distributionDesc.setBuckets(FeConstants.default_bucket_num);
         } else {
             long partitionSize = ParseUtil
-                    
.analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
+                    
.analyzeDataVolume(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
             
distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize, 
Config.autobucket_min_buckets));
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 8dd91c3fd8a..6debdca789f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -573,7 +573,7 @@ public class OutFileClause {
         }
 
         if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
-            maxFileSizeBytes = 
ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE));
+            maxFileSizeBytes = 
ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE));
             if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < 
MIN_FILE_SIZE_BYTES) {
                 throw new AnalysisException("max file size should between 5MB 
and 2GB. Given: " + maxFileSizeBytes);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index 6142feec895..e80860bf584 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -179,12 +179,22 @@ public class SetVar {
             this.result = (LiteralExpr) this.value;
         }
 
-        if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
-            this.value = new 
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
+        if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)
+                || 
getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
+            this.value = new 
StringLiteral(Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
             this.result = (LiteralExpr) this.value;
         }
-        if 
(getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) {
-            this.value = new 
StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
+        if (getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) {
+            try {
+                this.value = new StringLiteral(
+                        
Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue())));
+            } catch (Throwable t) {
+                // The way of handling file_split_size should be same as 
exec_mem_limit or scan_queue_mem_limit.
+                // But ParseUtil.analyzeDataVolume() does not accept 0 as a 
valid value.
+                // So for compatibility, we set origin value to file_split_size
+                // when the value is 0 or other invalid value.
+                this.value = new StringLiteral(getResult().getStringValue());
+            }
             this.result = (LiteralExpr) this.value;
         }
         if (getVariable().equalsIgnoreCase("is_report_success")) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
index c47753d2d42..649f03ffcc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java
@@ -41,7 +41,7 @@ public class ParseUtil {
 
     private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)");
 
-    public static long analyzeDataVolumn(String dataVolumnStr) throws 
AnalysisException {
+    public static long analyzeDataVolume(String dataVolumnStr) throws 
AnalysisException {
         long dataVolumn = 0;
         Matcher m = dataVolumnPattern.matcher(dataVolumnStr);
         if (m.matches()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 48c4a9ff3f4..c8c4323d34d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -26,7 +26,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.FileSplit.FileSplitCreator;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
@@ -64,12 +63,13 @@ import java.util.stream.Collectors;
 public abstract class FileScanNode extends ExternalScanNode {
     private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
 
-    public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB
+    public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
 
     // For explain
     protected long totalFileSize = 0;
     protected long totalPartitionNum = 0;
     protected long fileSplitSize;
+    protected boolean isSplitSizeSetBySession = false;
 
     public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -79,7 +79,15 @@ public abstract class FileScanNode extends ExternalScanNode {
 
     @Override
     public void init() throws UserException {
+        initFileSplitSize();
+    }
+
+    private void initFileSplitSize() {
         this.fileSplitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
+        this.isSplitSizeSetBySession = this.fileSplitSize > 0;
+        if (this.fileSplitSize <= 0) {
+            this.fileSplitSize = DEFAULT_SPLIT_SIZE;
+        }
     }
 
     @Override
@@ -249,12 +257,6 @@ public abstract class FileScanNode extends 
ExternalScanNode {
         }
     }
 
-    protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] 
blockLocations, long length,
-            long modificationTime, boolean splittable, List<String> 
partitionValues) throws IOException {
-        return splitFile(path, blockSize, blockLocations, length, 
modificationTime, splittable, partitionValues,
-                FileSplitCreator.DEFAULT);
-    }
-
     protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] 
blockLocations, long length,
             long modificationTime, boolean splittable, List<String> 
partitionValues, SplitCreator splitCreator)
             throws IOException {
@@ -271,11 +273,11 @@ public abstract class FileScanNode extends 
ExternalScanNode {
             result.add(splitCreator.create(path, 0, length, length, 
modificationTime, hosts, partitionValues));
             return result;
         }
-        if (fileSplitSize <= 0) {
-            fileSplitSize = blockSize;
+        // if file split size is set by session variable, use session variable.
+        // Otherwise, use max(file split size, block size)
+        if (!isSplitSizeSetBySession) {
+            fileSplitSize = Math.max(fileSplitSize, blockSize);
         }
-        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-        fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE);
         long bytesRemaining;
         for (bytesRemaining = length; (double) bytesRemaining / (double) 
fileSplitSize > 1.1D;
                 bytesRemaining -= fileSplitSize) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 7f23385d847..da88a03f2eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -35,7 +35,6 @@ import 
org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.util.CacheBulkLoader;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.CacheException;
-import org.apache.doris.datasource.FileSplit;
 import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.fs.FileSystemCache;
@@ -512,8 +511,7 @@ public class HiveMetaStoreCache {
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("get #{} files from #{} partitions in catalog {} cost: 
{} ms",
-                    fileLists.stream().mapToInt(l -> l.getFiles() == null
-                            ? (l.getSplits() == null ? 0 : 
l.getSplits().size()) : l.getFiles().size()).sum(),
+                    fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 
: l.getFiles().size()).sum(),
                     partitions.size(), catalog.getName(), 
(System.currentTimeMillis() - start));
         }
         return fileLists;
@@ -992,9 +990,6 @@ public class HiveMetaStoreCache {
     public static class FileCacheValue {
         // File Cache for self splitter.
         private final List<HiveFileStatus> files = Lists.newArrayList();
-        // File split cache for old splitter. This is a temp variable.
-        @Deprecated
-        private final List<FileSplit> splits = Lists.newArrayList();
         private boolean isSplittable;
         // The values of partitions.
         // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
@@ -1015,13 +1010,6 @@ public class HiveMetaStoreCache {
             }
         }
 
-        @Deprecated
-        public void addSplit(FileSplit split) {
-            if (isFileVisible(split.getPath())) {
-                splits.add(split);
-            }
-        }
-
         public int getValuesSize() {
             return partitionValues == null ? 0 : partitionValues.size();
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 1bdb805f0fd..6ef551825e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -320,10 +320,6 @@ public class HiveScanNode extends FileQueryScanNode {
             return;
         }
         for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
-            // This if branch is to support old splitter, will remove later.
-            if (fileCacheValue.getSplits() != null) {
-                allFiles.addAll(fileCacheValue.getSplits());
-            }
             if (fileCacheValue.getFiles() != null) {
                 boolean isSplittable = fileCacheValue.isSplittable();
                 for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index be30921e46a..56222d84955 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -37,7 +37,6 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -205,14 +204,12 @@ public class IcebergScanNode extends FileQueryScanNode {
         // get splits
         List<Split> splits = new ArrayList<>();
         int formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
-        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-        long splitSize = 
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), 
DEFAULT_SPLIT_SIZE);
         HashSet<String> partitionPathSet = new HashSet<>();
         boolean isPartitionedTable = icebergTable.spec().isPartitioned();
 
-        CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
+        CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize);
         try (CloseableIterable<CombinedScanTask> combinedScanTasks =
-                TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
+                TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = 
normalizeLocation(splitTask.file().path().toString());
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 96e96d3cf19..26b90c26a46 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.datasource.FileSplit.FileSplitCreator;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -139,7 +140,8 @@ public class TVFScanNode extends FileQueryScanNode {
             Path path = new Path(fileStatus.getPath());
             try {
                 splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, 
fileStatus.getSize(),
-                        fileStatus.getModificationTime(), 
fileStatus.isSplitable, null));
+                        fileStatus.getModificationTime(), 
fileStatus.isSplitable, null,
+                        FileSplitCreator.DEFAULT));
             } catch (IOException e) {
                 LOG.warn("get file split failed for TVF: {}", path, e);
                 throw new UserException(e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index b8b99c055ba..4bbae8d4e78 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -694,7 +694,7 @@ public class CreateTableInfo {
             if 
(!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE))
 {
                 
distributionDesc.updateBucketNum(FeConstants.default_bucket_num);
             } else {
-                long partitionSize = ParseUtil.analyzeDataVolumn(
+                long partitionSize = ParseUtil.analyzeDataVolume(
                         
newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
                 
distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize,
                         Config.autobucket_min_buckets));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to