This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f07b884819 [improvement](external)add some improvements for external 
scan (#38946)
5f07b884819 is described below

commit 5f07b884819a0187862d0005ec60cae96edcafea
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Sun Nov 3 23:44:58 2024 +0800

    [improvement](external)add some improvements for external scan (#38946)
    
    ## Proposed changes
    
    1. add session variable: `use_consistent_hash_for_external_scan`, which
    can specify consistent hash for external scan.
    2. add session variable: `ignore_split_type`, which can ignore splits of
    the specified type, use for performance tuning.
    3. add split weight for paimon split with consistent hash.
    4. add `executeFilter` for paimon jni split.
---
 .../org/apache/doris/paimon/PaimonJniScanner.java  |  2 +-
 .../apache/doris/datasource/ExternalScanNode.java  |  3 +-
 .../doris/datasource/FederationBackendPolicy.java  |  2 +-
 .../org/apache/doris/datasource/FileSplit.java     | 19 +++++
 .../iceberg/source/IcebergDeleteFileFilter.java    | 20 +++--
 .../datasource/iceberg/source/IcebergScanNode.java |  7 +-
 .../datasource/iceberg/source/IcebergSplit.java    |  6 ++
 .../datasource/paimon/source/PaimonScanNode.java   | 25 +++++-
 .../datasource/paimon/source/PaimonSplit.java      | 22 +++++
 .../glue/translator/PhysicalPlanTranslator.java    |  3 +-
 .../apache/doris/planner/SingleNodePlanner.java    |  3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 36 ++++++++
 .../src/main/java/org/apache/doris/spi/Split.java  |  5 ++
 .../doris/planner/FederationBackendPolicyTest.java | 95 ++++++++++++++++++++++
 14 files changed, 228 insertions(+), 20 deletions(-)

diff --git 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
index f229134e9d8..7bd9fa631c8 100644
--- 
a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
+++ 
b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java
@@ -125,7 +125,7 @@ public class PaimonJniScanner extends JniScanner {
         int[] projected = getProjected();
         readBuilder.withProjection(projected);
         readBuilder.withFilter(getPredicates());
-        reader = readBuilder.newRead().createReader(getSplit());
+        reader = 
readBuilder.newRead().executeFilter().createReader(getSplit());
         paimonDataTypeList =
             Arrays.stream(projected).mapToObj(i -> 
table.rowType().getTypeAt(i)).collect(Collectors.toList());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index e85fed8b62a..0d67a9e44b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -46,7 +46,8 @@ public abstract class ExternalScanNode extends ScanNode {
     protected boolean needCheckColumnPriv;
 
     protected final FederationBackendPolicy backendPolicy = 
(ConnectContext.get() != null
-            && ConnectContext.get().getSessionVariable().enableFileCache)
+            && (ConnectContext.get().getSessionVariable().enableFileCache
+                || 
ConnectContext.get().getSessionVariable().getUseConsistentHashForExternalScan()))
             ? new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
             : new FederationBackendPolicy();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index a2b902fd744..1e1787c1f64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -497,7 +497,7 @@ public class FederationBackendPolicy {
     private static class SplitHash implements Funnel<Split> {
         @Override
         public void funnel(Split split, PrimitiveSink primitiveSink) {
-            
primitiveSink.putBytes(split.getPathString().getBytes(StandardCharsets.UTF_8));
+            
primitiveSink.putBytes(split.getConsistentHashString().getBytes(StandardCharsets.UTF_8));
             primitiveSink.putLong(split.getStart());
             primitiveSink.putLong(split.getLength());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
index 7eaa87b74aa..1ebb390e904 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplit.java
@@ -47,6 +47,9 @@ public class FileSplit implements Split {
     // the location type for BE, eg: HDFS, LOCAL, S3
     protected TFileType locationType;
 
+    public Long selfSplitWeight;
+    public Long targetSplitSize;
+
     public FileSplit(LocationPath path, long start, long length, long 
fileLength,
             long modificationTime, String[] hosts, List<String> 
partitionValues) {
         this.path = path;
@@ -89,4 +92,20 @@ public class FileSplit implements Split {
             return new FileSplit(path, start, length, fileLength, 
modificationTime, hosts, partitionValues);
         }
     }
+
+    @Override
+    public void setTargetSplitSize(Long targetSplitSize) {
+        this.targetSplitSize = targetSplitSize;
+    }
+
+    @Override
+    public SplitWeight getSplitWeight() {
+        if (selfSplitWeight != null && targetSplitSize != null) {
+            double computedWeight = selfSplitWeight * 1.0 / targetSplitSize;
+            // Clamp the value be between the minimum weight and 1.0 (standard 
weight)
+            return 
SplitWeight.fromProportion(Math.min(Math.max(computedWeight, 0.01), 1.0));
+        } else {
+            return SplitWeight.standard();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
index 394bc849a56..b876732ff3f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
@@ -25,23 +25,25 @@ import java.util.OptionalLong;
 @Data
 public class IcebergDeleteFileFilter {
     private String deleteFilePath;
+    private long filesize;
 
-    public IcebergDeleteFileFilter(String deleteFilePath) {
+    public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
         this.deleteFilePath = deleteFilePath;
+        this.filesize = filesize;
     }
 
     public static PositionDelete createPositionDelete(String deleteFilePath, 
Long positionLowerBound,
-                                                      Long positionUpperBound) 
{
-        return new PositionDelete(deleteFilePath, positionLowerBound, 
positionUpperBound);
+                                                      Long positionUpperBound, 
long filesize) {
+        return new PositionDelete(deleteFilePath, positionLowerBound, 
positionUpperBound, filesize);
     }
 
-    public static EqualityDelete createEqualityDelete(String deleteFilePath, 
List<Integer> fieldIds) {
+    public static EqualityDelete createEqualityDelete(String deleteFilePath, 
List<Integer> fieldIds, long fileSize) {
         // todo:
         // Schema deleteSchema = TypeUtil.select(scan.schema(), new 
HashSet<>(fieldIds));
         // StructLikeSet deleteSet = 
StructLikeSet.create(deleteSchema.asStruct());
         // pass deleteSet to BE
         // compare two StructLike value, if equals, filtered
-        return new EqualityDelete(deleteFilePath, fieldIds);
+        return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
     }
 
     static class PositionDelete extends IcebergDeleteFileFilter {
@@ -49,8 +51,8 @@ public class IcebergDeleteFileFilter {
         private final Long positionUpperBound;
 
         public PositionDelete(String deleteFilePath, Long positionLowerBound,
-                              Long positionUpperBound) {
-            super(deleteFilePath);
+                              Long positionUpperBound, long fileSize) {
+            super(deleteFilePath, fileSize);
             this.positionLowerBound = positionLowerBound;
             this.positionUpperBound = positionUpperBound;
         }
@@ -67,8 +69,8 @@ public class IcebergDeleteFileFilter {
     static class EqualityDelete extends IcebergDeleteFileFilter {
         private List<Integer> fieldIds;
 
-        public EqualityDelete(String deleteFilePath, List<Integer> fieldIds) {
-            super(deleteFilePath);
+        public EqualityDelete(String deleteFilePath, List<Integer> fieldIds, 
long fileSize) {
+            super(deleteFilePath, fileSize);
             this.fieldIds = fieldIds;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index fe6c54cf53b..56dda7b4fe2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -282,7 +282,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         selectedPartitionNum = partitionPathSet.size();
-
+        splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
         return splits;
     }
 
@@ -315,10 +315,11 @@ public class IcebergScanNode extends FileQueryScanNode {
                         .map(m -> 
m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
                         .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
                 
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
-                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L)));
+                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L),
+                        delete.fileSizeInBytes()));
             } else if (delete.content() == FileContent.EQUALITY_DELETES) {
                 filters.add(IcebergDeleteFileFilter.createEqualityDelete(
-                        delete.path().toString(), delete.equalityFieldIds()));
+                        delete.path().toString(), delete.equalityFieldIds(), 
delete.fileSizeInBytes()));
             } else {
                 throw new IllegalStateException("Unknown delete content: " + 
delete.content());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 46e8f96ba35..580d3cf1bb2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -47,6 +47,7 @@ public class IcebergSplit extends FileSplit {
         this.formatVersion = formatVersion;
         this.config = config;
         this.originalPath = originalPath;
+        this.selfSplitWeight = length;
     }
 
     public long getRowCount() {
@@ -56,4 +57,9 @@ public class IcebergSplit extends FileSplit {
     public void setRowCount(long rowCount) {
         this.rowCount = rowCount;
     }
+
+    public void setDeleteFileFilters(List<IcebergDeleteFileFilter> 
deleteFileFilters) {
+        this.deleteFileFilters = deleteFileFilters;
+        this.selfSplitWeight += 
deleteFileFilters.stream().mapToLong(IcebergDeleteFileFilter::getFilesize).sum();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 02f831ba37b..cd477cc9b29 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -28,7 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -101,9 +101,14 @@ public class PaimonScanNode extends FileQueryScanNode {
     private int rawFileSplitNum = 0;
     private int paimonSplitNum = 0;
     private List<SplitStat> splitStats = new ArrayList<>();
+    private SessionVariable sessionVariable;
 
-    public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+    public PaimonScanNode(PlanNodeId id,
+                          TupleDescriptor desc,
+                          boolean needCheckColumnPriv,
+                          SessionVariable sessionVariable) {
         super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv);
+        this.sessionVariable = sessionVariable;
     }
 
     @Override
@@ -176,7 +181,9 @@ public class PaimonScanNode extends FileQueryScanNode {
 
     @Override
     public List<Split> getSplits() throws UserException {
-        boolean forceJniScanner = 
ConnectContext.get().getSessionVariable().isForceJniScanner();
+        boolean forceJniScanner = sessionVariable.isForceJniScanner();
+        SessionVariable.IgnoreSplitType ignoreSplitType =
+                
SessionVariable.IgnoreSplitType.valueOf(sessionVariable.getIgnoreSplitType());
         List<Split> splits = new ArrayList<>();
         int[] projected = desc.getSlots().stream().mapToInt(
                 slot -> 
(source.getPaimonTable().rowType().getFieldNames().indexOf(slot.getColumn().getName())))
@@ -196,7 +203,11 @@ public class PaimonScanNode extends FileQueryScanNode {
                 selectedPartitionValues.add(partitionValue);
                 Optional<List<RawFile>> optRawFiles = 
dataSplit.convertToRawFiles();
                 Optional<List<DeletionFile>> optDeletionFiles = 
dataSplit.deletionFiles();
+
                 if (supportNativeReader(optRawFiles)) {
+                    if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
+                        continue;
+                    }
                     splitStat.setType(SplitReadType.NATIVE);
                     splitStat.setRawFileConvertable(true);
                     List<RawFile> rawFiles = optRawFiles.get();
@@ -252,10 +263,16 @@ public class PaimonScanNode extends FileQueryScanNode {
                         }
                     }
                 } else {
+                    if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
+                        continue;
+                    }
                     splits.add(new PaimonSplit(split));
                     ++paimonSplitNum;
                 }
             } else {
+                if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
+                    continue;
+                }
                 splits.add(new PaimonSplit(split));
                 ++paimonSplitNum;
             }
@@ -263,6 +280,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         this.selectedPartitionNum = selectedPartitionValues.size();
         // TODO: get total partition number
+        // We should set fileSplitSize at the end because fileSplitSize may be 
modified in splitFile.
+        splits.forEach(s -> s.setTargetSplitSize(fileSplitSize));
         return splits;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index ffd063d77e8..3ab38c7db28 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -23,11 +23,14 @@ import org.apache.doris.datasource.SplitCreator;
 import org.apache.doris.datasource.TableFormatType;
 
 import com.google.common.collect.Maps;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.Split;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
 
 public class PaimonSplit extends FileSplit {
     private static final LocationPath DUMMY_PATH = new 
LocationPath("/dummyPath", Maps.newHashMap());
@@ -35,11 +38,20 @@ public class PaimonSplit extends FileSplit {
     private TableFormatType tableFormatType;
     private Optional<DeletionFile> optDeletionFile;
 
+
     public PaimonSplit(Split split) {
         super(DUMMY_PATH, 0, 0, 0, 0, null, null);
         this.split = split;
         this.tableFormatType = TableFormatType.PAIMON;
         this.optDeletionFile = Optional.empty();
+
+        if (split instanceof DataSplit) {
+            List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
+            this.path = new LocationPath("/" + 
dataFileMetas.get(0).fileName());
+            this.selfSplitWeight = 
dataFileMetas.stream().mapToLong(DataFileMeta::fileSize).sum();
+        } else {
+            this.selfSplitWeight = split.rowCount();
+        }
     }
 
     private PaimonSplit(LocationPath file, long start, long length, long 
fileLength, long modificationTime,
@@ -47,6 +59,15 @@ public class PaimonSplit extends FileSplit {
         super(file, start, length, fileLength, modificationTime, hosts, 
partitionList);
         this.tableFormatType = TableFormatType.PAIMON;
         this.optDeletionFile = Optional.empty();
+        this.selfSplitWeight = length;
+    }
+
+    @Override
+    public String getConsistentHashString() {
+        if (this.path == DUMMY_PATH) {
+            return UUID.randomUUID().toString();
+        }
+        return getPathString();
     }
 
     public Split getSplit() {
@@ -66,6 +87,7 @@ public class PaimonSplit extends FileSplit {
     }
 
     public void setDeletionFile(DeletionFile deletionFile) {
+        this.selfSplitWeight += deletionFile.length();
         this.optDeletionFile = Optional.of(deletionFile);
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 3078fb36df6..654ccc8ca11 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -574,7 +574,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         } else if (table instanceof IcebergExternalTable) {
             scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else if (table instanceof PaimonExternalTable) {
-            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
+                ConnectContext.get().getSessionVariable());
         } else if (table instanceof TrinoConnectorExternalTable) {
             scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else if (table instanceof MaxComputeExternalTable) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 152bb7cc881..d94ad0a2552 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -1986,7 +1986,8 @@ public class SingleNodePlanner {
                 scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
                 break;
             case PAIMON_EXTERNAL_TABLE:
-                scanNode = new PaimonScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new PaimonScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true,
+                    ConnectContext.get().getSessionVariable());
                 break;
             case TRINO_CONNECTOR_EXTERNAL_TABLE:
                 scanNode = new TrinoConnectorScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 52ea334a142..5f031843025 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -958,6 +958,26 @@ public class SessionVariable implements Serializable, 
Writable {
                         setter = "setPipelineTaskNum")
     public int parallelPipelineTaskNum = 0;
 
+
+    public enum IgnoreSplitType {
+        NONE,
+        IGNORE_JNI,
+        IGNORE_NATIVE
+    }
+
+    public static final String IGNORE_SPLIT_TYPE = "ignore_split_type";
+    @VariableMgr.VarAttr(name = IGNORE_SPLIT_TYPE,
+            checker = "checkIgnoreSplitType",
+            options = {"NONE", "IGNORE_JNI", "IGNORE_NATIVE"},
+            description = {"忽略指定类型的split", "Ignore splits of the specified 
type"})
+    public String ignoreSplitType = IgnoreSplitType.NONE.toString();
+
+    public static final String USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN = 
"use_consistent_hash_for_external_scan";
+    @VariableMgr.VarAttr(name = USE_CONSISTENT_HASHING_FOR_EXTERNAL_SCAN,
+            description = {"对外表采用一致性hash的方式做split的分发",
+                    "Use consistent hashing to split the appearance for 
external scan"})
+    public boolean useConsistentHashForExternalScan = false;
+
     @VariableMgr.VarAttr(name = PROFILE_LEVEL, fuzzy = true)
     public int profileLevel = 1;
 
@@ -4380,6 +4400,22 @@ public class SessionVariable implements Serializable, 
Writable {
         return forceJniScanner;
     }
 
+    public String getIgnoreSplitType() {
+        return ignoreSplitType;
+    }
+
+    public void checkIgnoreSplitType(String value) {
+        try {
+            IgnoreSplitType.valueOf(value);
+        } catch (Exception e) {
+            throw new UnsupportedOperationException("We only support `NONE`, 
`IGNORE_JNI` and `IGNORE_NATIVE`");
+        }
+    }
+
+    public boolean getUseConsistentHashForExternalScan() {
+        return useConsistentHashForExternalScan;
+    }
+
     public void setForceJniScanner(boolean force) {
         forceJniScanner = force;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index e86b287ac93..412e4b6792f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -48,4 +48,9 @@ public interface Split {
 
     void setAlternativeHosts(List<String> alternativeHosts);
 
+    default String getConsistentHashString() {
+        return getPathString();
+    }
+
+    void setTargetSplitSize(Long targetSplitSize);
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index df2e7dd3932..3b3e2eeedf7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -741,4 +741,99 @@ public class FederationBackendPolicyTest {
 
         return entries1.containsAll(entries2) && 
entries2.containsAll(entries1);
     }
+
+    @Test
+    public void testSplitWeight() {
+        FileSplit fileSplit = new FileSplit(new LocationPath("s1"), 0, 1000, 
1000, 0, null, Collections.emptyList());
+        fileSplit.setSelfSplitWeight(1000L);
+
+        fileSplit.setTargetSplitSize(10L);
+        Assert.assertEquals(100L, fileSplit.getSplitWeight().getRawValue(), 
100L);
+
+        fileSplit.setTargetSplitSize(10000000L);
+        Assert.assertEquals(1L, fileSplit.getSplitWeight().getRawValue());
+
+        fileSplit.setTargetSplitSize(2000L);
+        Assert.assertEquals(50, fileSplit.getSplitWeight().getRawValue());
+    }
+
+    @Test
+    public void testBiggerSplit() throws UserException {
+        SystemInfoService service = new SystemInfoService();
+
+        Backend backend1 = new Backend(1L, "172.30.0.100", 9050);
+        backend1.setAlive(true);
+        service.addBackend(backend1);
+        Backend backend2 = new Backend(2L, "172.30.0.106", 9050);
+        backend2.setAlive(true);
+        service.addBackend(backend2);
+        Backend backend3 = new Backend(3L, "172.30.0.118", 9050);
+        backend3.setAlive(true);
+        service.addBackend(backend3);
+
+        new MockUp<Env>() {
+            @Mock
+            public SystemInfoService getCurrentSystemInfo() {
+                return service;
+            }
+        };
+
+        List<Split> splits = new ArrayList<>();
+        splits.add(genFileSplit("s1", 1000000L, 1000L)); // belong 2
+        splits.add(genFileSplit("s2", 100000L, 1000L));  // belong 2
+        splits.add(genFileSplit("s3", 200000L, 1000L));  // belong 2
+        splits.add(genFileSplit("s4", 300000L, 1000L));  // belong 2
+        splits.add(genFileSplit("s5", 800000L, 1000L));  // belong 1
+
+        FederationBackendPolicy policy = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+        // Set these options to ensure that the consistent hash algorithm is 
consistent.
+        policy.setEnableSplitsRedistribution(false);
+        Config.split_assigner_min_consistent_hash_candidate_num = 1;
+        policy.init();
+        Multimap<Backend, Split> assignment = 
policy.computeScanRangeAssignment(splits);
+        Map<Backend, List<Split>> backendListMap = mergeAssignment(assignment);
+        backendListMap.forEach((k, v) -> {
+            if (k.getId() == 1) {
+                Assert.assertEquals(800000L, 
v.stream().mapToLong(Split::getLength).sum());
+            } else if (k.getId() == 2) {
+                Assert.assertEquals(1600000L, 
v.stream().mapToLong(Split::getLength).sum());
+            }
+        });
+
+        Config.split_assigner_min_consistent_hash_candidate_num = 1;
+        FederationBackendPolicy policy2 = new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING);
+        policy2.init();
+        Multimap<Backend, Split> assignment2 = 
policy2.computeScanRangeAssignment(splits);
+        Map<Backend, List<Split>> backendListMap2 = 
mergeAssignment(assignment2);
+        backendListMap2.forEach((k, v) -> {
+            if (k.getId() == 1) {
+                Assert.assertEquals(900000L, 
v.stream().mapToLong(Split::getLength).sum());
+            } else if (k.getId() == 2) {
+                Assert.assertEquals(500000L, 
v.stream().mapToLong(Split::getLength).sum());
+            } else if (k.getId() == 3) {
+                Assert.assertEquals(1000000L, 
v.stream().mapToLong(Split::getLength).sum());
+            }
+        });
+    }
+
+    private Map<Backend, List<Split>> mergeAssignment(Multimap<Backend, Split> 
ass) {
+        HashMap<Backend, List<Split>> map = new HashMap<>();
+        ass.forEach((k, v) -> {
+            if (map.containsKey(k)) {
+                map.get(k).add(v);
+            } else {
+                ArrayList<Split> splits = new ArrayList<>();
+                splits.add(v);
+                map.put(k, splits);
+            }
+        });
+        return map;
+    }
+
+    private FileSplit genFileSplit(String path, long length, long targetSplit) 
{
+        FileSplit s = new FileSplit(new LocationPath(path), 0, length, length, 
0, null, Collections.emptyList());
+        s.setSelfSplitWeight(length);
+        s.setTargetSplitSize(targetSplit);
+        return s;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to