This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2ae2a139f8b [opt](catalog) merge scan range to avoid too many splits (#38311) 2ae2a139f8b is described below commit 2ae2a139f8b9803320044d5dc2d602ff749eaba4 Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Aug 6 17:53:22 2024 +0800 [opt](catalog) merge scan range to avoid too many splits (#38311) PR #34032 introduce a new method to get splits batch by batch, but it removed a logic that BE will merge scan ranges to avoid too many scan ranges being scheduled. This PR mainly changes: 1. Add scan range merging logic back. 2. Change the default file split size from 8MB to 64MB, to avoid too many small split. --- be/src/pipeline/exec/file_scan_operator.cpp | 6 ++- be/src/vec/exec/scan/split_source_connector.cpp | 2 +- be/src/vec/exec/scan/split_source_connector.h | 55 ++++++++++++++++++++-- .../doris/analysis/AlterDatabaseQuotaStmt.java | 2 +- .../org/apache/doris/analysis/CreateTableStmt.java | 2 +- .../org/apache/doris/analysis/OutFileClause.java | 2 +- .../java/org/apache/doris/analysis/SetVar.java | 18 +++++-- .../org/apache/doris/common/util/ParseUtil.java | 2 +- .../org/apache/doris/datasource/FileScanNode.java | 26 +++++----- .../doris/datasource/hive/HiveMetaStoreCache.java | 14 +----- .../doris/datasource/hive/source/HiveScanNode.java | 4 -- .../datasource/iceberg/source/IcebergScanNode.java | 7 +-- .../doris/datasource/tvf/source/TVFScanNode.java | 4 +- .../trees/plans/commands/info/CreateTableInfo.java | 2 +- 14 files changed, 95 insertions(+), 51 deletions(-) diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 98cc91824f6..d73cfc405fd 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -73,11 +73,13 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state, auto split_source = scan_range.split_source; RuntimeProfile::Counter* get_split_timer = ADD_TIMER(_runtime_profile, "GetSplitTime"); _split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>( - state, get_split_timer, split_source.split_source_id, split_source.num_splits); + state, get_split_timer, split_source.split_source_id, split_source.num_splits, + _max_scanners); } } if (_split_source == nullptr) { - _split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges); + _split_source = + std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, _max_scanners); } _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); if (scan_ranges.size() > 0 && diff --git a/be/src/vec/exec/scan/split_source_connector.cpp b/be/src/vec/exec/scan/split_source_connector.cpp index 6533ae2bfe0..478af522e76 100644 --- a/be/src/vec/exec/scan/split_source_connector.cpp +++ b/be/src/vec/exec/scan/split_source_connector.cpp @@ -60,7 +60,7 @@ Status RemoteSplitSourceConnector::get_next(bool* has_next, TFileRangeDesc* rang return Status::IOError<false>("Failed to get batch of split source: {}", e.what()); } _last_batch = result.splits.empty(); - _scan_ranges = result.splits; + _merge_ranges<TScanRangeLocations>(_scan_ranges, result.splits); _scan_index = 0; _range_index = 0; } diff --git a/be/src/vec/exec/scan/split_source_connector.h b/be/src/vec/exec/scan/split_source_connector.h index f62b45612bf..8f38cd4f17a 100644 --- a/be/src/vec/exec/scan/split_source_connector.h +++ b/be/src/vec/exec/scan/split_source_connector.h @@ -43,6 +43,49 @@ public: virtual int num_scan_ranges() = 0; virtual TFileScanRangeParams* get_params() = 0; + +protected: + template <typename T> + void _merge_ranges(std::vector<T>& merged_ranges, const std::vector<T>& scan_ranges) { + if (scan_ranges.size() <= _max_scanners) { + merged_ranges = scan_ranges; + return; + } + + // There is no need for the number of scanners to exceed the number of threads in thread pool. + // scan_ranges is sorted by path(as well as partition path) in FE, so merge scan ranges in order. + // In the insert statement, reading data in partition order can reduce the memory usage of BE + // and prevent the generation of smaller tables. + merged_ranges.resize(_max_scanners); + int num_ranges = scan_ranges.size() / _max_scanners; + int num_add_one = scan_ranges.size() - num_ranges * _max_scanners; + int scan_index = 0; + int range_index = 0; + for (int i = 0; i < num_add_one; ++i) { + merged_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + for (int i = num_add_one; i < _max_scanners; ++i) { + merged_ranges[scan_index] = scan_ranges[range_index++]; + auto& ranges = + merged_ranges[scan_index++].scan_range.ext_scan_range.file_scan_range.ranges; + for (int j = 0; j < num_ranges - 1; j++) { + auto& merged_ranges = + scan_ranges[range_index++].scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + } + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << merged_ranges.size(); + } + +protected: + int _max_scanners; }; /** @@ -59,8 +102,10 @@ private: int _range_index = 0; public: - LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges) - : _scan_ranges(scan_ranges) {} + LocalSplitSourceConnector(const std::vector<TScanRangeParams>& scan_ranges, int max_scanners) { + _max_scanners = max_scanners; + _merge_ranges<TScanRangeParams>(_scan_ranges, scan_ranges); + } Status get_next(bool* has_next, TFileRangeDesc* range) override; @@ -98,11 +143,13 @@ private: public: RemoteSplitSourceConnector(RuntimeState* state, RuntimeProfile::Counter* get_split_timer, - int64 split_source_id, int num_splits) + int64 split_source_id, int num_splits, int max_scanners) : _state(state), _get_split_timer(get_split_timer), _split_source_id(split_source_id), - _num_splits(num_splits) {} + _num_splits(num_splits) { + _max_scanners = max_scanners; + } Status get_next(bool* has_next, TFileRangeDesc* range) override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java index 552b701d029..e832e193ab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterDatabaseQuotaStmt.java @@ -80,7 +80,7 @@ public class AlterDatabaseQuotaStmt extends DdlStmt { ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); } if (quotaType == QuotaType.DATA) { - quota = ParseUtil.analyzeDataVolumn(quotaValue); + quota = ParseUtil.analyzeDataVolume(quotaValue); } else if (quotaType == QuotaType.REPLICA) { quota = ParseUtil.analyzeReplicaNumber(quotaValue); } else if (quotaType == QuotaType.TRANSACTION) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index a5b39228c17..fcdc38b9994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -127,7 +127,7 @@ public class CreateTableStmt extends DdlStmt implements NotFallbackInParser { distributionDesc.setBuckets(FeConstants.default_bucket_num); } else { long partitionSize = ParseUtil - .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); + .analyzeDataVolume(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 8dd91c3fd8a..6debdca789f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -573,7 +573,7 @@ public class OutFileClause { } if (properties.containsKey(PROP_MAX_FILE_SIZE)) { - maxFileSizeBytes = ParseUtil.analyzeDataVolumn(properties.get(PROP_MAX_FILE_SIZE)); + maxFileSizeBytes = ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE)); if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < MIN_FILE_SIZE_BYTES) { throw new AnalysisException("max file size should between 5MB and 2GB. Given: " + maxFileSizeBytes); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index 6142feec895..e80860bf584 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -179,12 +179,22 @@ public class SetVar { this.result = (LiteralExpr) this.value; } - if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); + if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT) + || getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { + this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); this.result = (LiteralExpr) this.value; } - if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { - this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue()))); + if (getVariable().equalsIgnoreCase(SessionVariable.FILE_SPLIT_SIZE)) { + try { + this.value = new StringLiteral( + Long.toString(ParseUtil.analyzeDataVolume(getResult().getStringValue()))); + } catch (Throwable t) { + // The way of handling file_split_size should be same as exec_mem_limit or scan_queue_mem_limit. + // But ParseUtil.analyzeDataVolume() does not accept 0 as a valid value. + // So for compatibility, we set origin value to file_split_size + // when the value is 0 or other invalid value. + this.value = new StringLiteral(getResult().getStringValue()); + } this.result = (LiteralExpr) this.value; } if (getVariable().equalsIgnoreCase("is_report_success")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java index c47753d2d42..649f03ffcc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ParseUtil.java @@ -41,7 +41,7 @@ public class ParseUtil { private static Pattern dataVolumnPattern = Pattern.compile("(\\d+)(\\D*)"); - public static long analyzeDataVolumn(String dataVolumnStr) throws AnalysisException { + public static long analyzeDataVolume(String dataVolumnStr) throws AnalysisException { long dataVolumn = 0; Matcher m = dataVolumnPattern.matcher(dataVolumnStr); if (m.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index 48c4a9ff3f4..c8c4323d34d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; -import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -64,12 +63,13 @@ import java.util.stream.Collectors; public abstract class FileScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(FileScanNode.class); - public static final long DEFAULT_SPLIT_SIZE = 8 * 1024 * 1024; // 8MB + public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; protected long fileSplitSize; + protected boolean isSplitSizeSetBySession = false; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -79,7 +79,15 @@ public abstract class FileScanNode extends ExternalScanNode { @Override public void init() throws UserException { + initFileSplitSize(); + } + + private void initFileSplitSize() { this.fileSplitSize = ConnectContext.get().getSessionVariable().getFileSplitSize(); + this.isSplitSizeSetBySession = this.fileSplitSize > 0; + if (this.fileSplitSize <= 0) { + this.fileSplitSize = DEFAULT_SPLIT_SIZE; + } } @Override @@ -249,12 +257,6 @@ public abstract class FileScanNode extends ExternalScanNode { } } - protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, - long modificationTime, boolean splittable, List<String> partitionValues) throws IOException { - return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues, - FileSplitCreator.DEFAULT); - } - protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length, long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator) throws IOException { @@ -271,11 +273,11 @@ public abstract class FileScanNode extends ExternalScanNode { result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues)); return result; } - if (fileSplitSize <= 0) { - fileSplitSize = blockSize; + // if file split size is set by session variable, use session variable. + // Otherwise, use max(file split size, block size) + if (!isSplitSizeSetBySession) { + fileSplitSize = Math.max(fileSplitSize, blockSize); } - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - fileSplitSize = Math.max(fileSplitSize, DEFAULT_SPLIT_SIZE); long bytesRemaining; for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; bytesRemaining -= fileSplitSize) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 7f23385d847..da88a03f2eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -35,7 +35,6 @@ import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.CacheBulkLoader; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.CacheException; -import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.fs.FileSystemCache; @@ -512,8 +511,7 @@ public class HiveMetaStoreCache { if (LOG.isDebugEnabled()) { LOG.debug("get #{} files from #{} partitions in catalog {} cost: {} ms", - fileLists.stream().mapToInt(l -> l.getFiles() == null - ? (l.getSplits() == null ? 0 : l.getSplits().size()) : l.getFiles().size()).sum(), + fileLists.stream().mapToInt(l -> l.getFiles() == null ? 0 : l.getFiles().size()).sum(), partitions.size(), catalog.getName(), (System.currentTimeMillis() - start)); } return fileLists; @@ -992,9 +990,6 @@ public class HiveMetaStoreCache { public static class FileCacheValue { // File Cache for self splitter. private final List<HiveFileStatus> files = Lists.newArrayList(); - // File split cache for old splitter. This is a temp variable. - @Deprecated - private final List<FileSplit> splits = Lists.newArrayList(); private boolean isSplittable; // The values of partitions. // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile @@ -1015,13 +1010,6 @@ public class HiveMetaStoreCache { } } - @Deprecated - public void addSplit(FileSplit split) { - if (isFileVisible(split.getPath())) { - splits.add(split); - } - } - public int getValuesSize() { return partitionValues == null ? 0 : partitionValues.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 1bdb805f0fd..6ef551825e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -320,10 +320,6 @@ public class HiveScanNode extends FileQueryScanNode { return; } for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { - // This if branch is to support old splitter, will remove later. - if (fileCacheValue.getSplits() != null) { - allFiles.addAll(fileCacheValue.getSplits()); - } if (fileCacheValue.getFiles() != null) { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index be30921e46a..56222d84955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -37,7 +37,6 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -205,14 +204,12 @@ public class IcebergScanNode extends FileQueryScanNode { // get splits List<Split> splits = new ArrayList<>(); int formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); - // Min split size is DEFAULT_SPLIT_SIZE(128MB). - long splitSize = Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), DEFAULT_SPLIT_SIZE); HashSet<String> partitionPathSet = new HashSet<>(); boolean isPartitionedTable = icebergTable.spec().isPartitioned(); - CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), splitSize); + CloseableIterable<FileScanTask> fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), fileSplitSize); try (CloseableIterable<CombinedScanTask> combinedScanTasks = - TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) { + TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) { combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> { String dataFilePath = normalizeLocation(splitTask.file().path().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index 96e96d3cf19..26b90c26a46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -139,7 +140,8 @@ public class TVFScanNode extends FileQueryScanNode { Path path = new Path(fileStatus.getPath()); try { splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(), - fileStatus.getModificationTime(), fileStatus.isSplitable, null)); + fileStatus.getModificationTime(), fileStatus.isSplitable, null, + FileSplitCreator.DEFAULT)); } catch (IOException e) { LOG.warn("get file split failed for TVF: {}", path, e); throw new UserException(e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java index b8b99c055ba..4bbae8d4e78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java @@ -694,7 +694,7 @@ public class CreateTableInfo { if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) { distributionDesc.updateBucketNum(FeConstants.default_bucket_num); } else { - long partitionSize = ParseUtil.analyzeDataVolumn( + long partitionSize = ParseUtil.analyzeDataVolume( newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)); distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize, Config.autobucket_min_buckets)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org