This is an automated email from the ASF dual-hosted git repository.

luozenglin pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 76c0cf05831 [cherry-pick](fe) select BE local broker to scan Hive 
table when 'broker.name' in hms catalog is specified (#27122) (#27252)
76c0cf05831 is described below

commit 76c0cf058318ef81d843d79ea5c2aae9db7f8458
Author: DuRipeng <453243...@qq.com>
AuthorDate: Mon Nov 20 11:43:15 2023 +0800

    [cherry-pick](fe) select BE local broker to scan Hive table when 
'broker.name' in hms catalog is specified (#27122) (#27252)
    
    Since #24830 introduce `broker.name` in hms catalog, data scan will run on 
specified brokers.
    And [doris operator](https://github.com/selectdb/doris-operator) support BE 
and broker deployed in same pod, BE access local broker is the fastest approach 
to access data.
    In previous logic, every inputSplit will select one BE to execute,  then 
randomly select one broker for actual data access, BE and related broker are 
always located on  separate K8S pod.
    This pr optimizes the broker select strategy to prioritize BE-local broker 
when `broker.name` is specified in hms catalog.
---
 .../apache/doris/planner/external/FileQueryScanNode.java | 16 +++++++++++++---
 .../org/apache/doris/planner/external/HiveScanNode.java  |  2 ++
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 7b57d9b0454..9468cc42881 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -97,6 +97,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
     @Getter
     protected TableSample tableSample;
 
+    protected String brokerName;
+
     /**
      * External file scan node for Query hms table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
@@ -292,7 +294,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         for (Split split : inputSplits) {
             FileSplit fileSplit = (FileSplit) split;
             TFileType locationType = 
getLocationType(fileSplit.getPath().toString());
-            setLocationPropertiesIfNecessary(locationType, locationProperties);
 
             TScanRangeLocations curLocations = newLocations();
             // If fileSplit has partition values, use the values collected 
from hive partitions.
@@ -352,6 +353,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             } else {
                 selectedBackend = backendPolicy.getNextBe();
             }
+            setLocationPropertiesIfNecessary(selectedBackend, locationType, 
locationProperties);
             location.setBackendId(selectedBackend.getId());
             location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
             curLocations.addToLocations(location);
@@ -368,7 +370,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
     }
 
-    private void setLocationPropertiesIfNecessary(TFileType locationType,
+    private void setLocationPropertiesIfNecessary(Backend selectedBackend, 
TFileType locationType,
             Map<String, String> locationProperties) throws UserException {
         if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
             if (!params.isSetHdfsParams()) {
@@ -381,7 +383,15 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
                 params.setProperties(locationProperties);
 
                 if (!params.isSetBrokerAddresses()) {
-                    FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                    FsBroker broker;
+                    if (brokerName != null) {
+                        broker = 
Env.getCurrentEnv().getBrokerMgr().getBroker(brokerName, 
selectedBackend.getHost());
+                        LOG.debug(String.format(
+                                "Set location for broker [%s], selected BE 
host: [%s] selected broker host: [%s]",
+                                brokerName, selectedBackend.getHost(), 
broker.host));
+                    } else {
+                        broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                    }
                     if (broker == null) {
                         throw new UserException("No alive broker.");
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index d4141810487..943d30017e7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -100,12 +100,14 @@ public class HiveScanNode extends FileQueryScanNode {
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
         super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv);
         hmsTable = (HMSExternalTable) desc.getTable();
+        brokerName = hmsTable.getCatalog().bindBrokerName();
     }
 
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
                         StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
         super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
         hmsTable = (HMSExternalTable) desc.getTable();
+        brokerName = hmsTable.getCatalog().bindBrokerName();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to