This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 05b48d645bd [fix](arrow-flight-sql) Arrow Flight support multiple 
endpoints (#44286)
05b48d645bd is described below

commit 05b48d645bda0b928df4c1e9dd42d0fa0112bc03
Author: Xinyi Zou <zouxi...@selectdb.com>
AuthorDate: Mon Nov 25 10:39:35 2024 +0800

    [fix](arrow-flight-sql) Arrow Flight support multiple endpoints (#44286)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    By default, the query results of all BE nodes will be aggregated to one
    BE node. ADBC Client will only receive one endpoint and pull data from
    the BE node corresponding to this endpoint.
    
    `set global enable_parallel_result_sink=true;` to allow each BE to
    return query results separately. ADBC Client will receive multiple
    endpoints and pull data from each endpoint.
---
 .../java/org/apache/doris/qe/ConnectContext.java   |  40 ++----
 .../main/java/org/apache/doris/qe/Coordinator.java |  32 ++---
 .../org/apache/doris/qe/NereidsCoordinator.java    |  29 ++--
 .../arrowflight/DorisFlightSqlProducer.java        |  77 ++++++-----
 .../arrowflight/FlightSqlConnectProcessor.java     | 154 +++++++++++----------
 .../results/FlightSqlEndpointsLocation.java        |  65 +++++++++
 6 files changed, 228 insertions(+), 169 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 5e0716da7d7..a16422ba9e5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -19,7 +19,6 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.BoolLiteral;
 import org.apache.doris.analysis.DecimalLiteral;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.FloatLiteral;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
@@ -63,11 +62,11 @@ import org.apache.doris.plsql.executor.PlSqlOperation;
 import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.system.Backend;
 import org.apache.doris.task.LoadTaskInfo;
-import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TResultSinkType;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
@@ -134,10 +133,7 @@ public class ConnectContext {
     protected volatile String peerIdentity;
     private final Map<String, String> preparedQuerys = new HashMap<>();
     private String runningQuery;
-    private TNetworkAddress resultFlightServerAddr;
-    private TNetworkAddress resultInternalServiceAddr;
-    private ArrayList<Expr> resultOutputExprs;
-    private TUniqueId finstId;
+    private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations 
= Lists.newArrayList();
     private boolean returnResultFromLocal = true;
     // mysql net
     protected volatile MysqlChannel mysqlChannel;
@@ -730,36 +726,16 @@ public class ConnectContext {
         return runningQuery;
     }
 
-    public void setResultFlightServerAddr(TNetworkAddress 
resultFlightServerAddr) {
-        this.resultFlightServerAddr = resultFlightServerAddr;
+    public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation 
flightSqlEndpointsLocation) {
+        this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
     }
 
-    public TNetworkAddress getResultFlightServerAddr() {
-        return resultFlightServerAddr;
+    public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
+        return flightSqlEndpointsLocations;
     }
 
-    public void setResultInternalServiceAddr(TNetworkAddress 
resultInternalServiceAddr) {
-        this.resultInternalServiceAddr = resultInternalServiceAddr;
-    }
-
-    public TNetworkAddress getResultInternalServiceAddr() {
-        return resultInternalServiceAddr;
-    }
-
-    public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
-        this.resultOutputExprs = resultOutputExprs;
-    }
-
-    public ArrayList<Expr> getResultOutputExprs() {
-        return resultOutputExprs;
-    }
-
-    public void setFinstId(TUniqueId finstId) {
-        this.finstId = finstId;
-    }
-
-    public TUniqueId getFinstId() {
-        return finstId;
+    public void clearFlightSqlEndpointsLocations() {
+        flightSqlEndpointsLocations.clear();
     }
 
     public void setReturnResultFromLocal(boolean returnResultFromLocal) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dee130886ec..acd0fbe0dae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -83,6 +83,7 @@ import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
@@ -733,29 +734,27 @@ public class Coordinator implements CoordInterface {
                 enableParallelResultSink = 
queryOptions.isEnableParallelOutfile();
             }
 
-            TNetworkAddress execBeAddr = 
topParams.instanceExecParams.get(0).host;
             Set<TNetworkAddress> addrs = new HashSet<>();
             for (FInstanceExecParam param : topParams.instanceExecParams) {
                 if (addrs.contains(param.host)) {
                     continue;
                 }
                 addrs.add(param.host);
-                receivers.add(new ResultReceiver(queryId, param.instanceId, 
addressToBackendID.get(param.host),
-                        toBrpcHost(param.host), this.timeoutDeadline,
-                        
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), 
enableParallelResultSink));
-            }
-
-            if (!context.isReturnResultFromLocal()) {
-                
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
-                if (enableParallelResultSink) {
-                    context.setFinstId(queryId);
+                if (context.isReturnResultFromLocal()) {
+                    receivers.add(new ResultReceiver(queryId, 
param.instanceId, addressToBackendID.get(param.host),
+                            toBrpcHost(param.host), this.timeoutDeadline,
+                            
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), 
enableParallelResultSink));
                 } else {
-                    
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
+                    
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
+                    TUniqueId finstId;
+                    if (enableParallelResultSink) {
+                        finstId = queryId;
+                    } else {
+                        finstId = 
topParams.instanceExecParams.get(0).instanceId;
+                    }
+                    context.addFlightSqlEndpointsLocation(new 
FlightSqlEndpointsLocation(finstId,
+                            toArrowFlightHost(param.host), 
toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
                 }
-                
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
-                
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
-                context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
-                
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
             }
 
             LOG.info("dispatch result sink of query {} to {}", 
DebugUtil.printId(queryId),
@@ -766,7 +765,8 @@ public class Coordinator implements CoordInterface {
                 // set the broker address for OUTFILE sink
                 ResultFileSink topResultFileSink = (ResultFileSink) 
topDataSink;
                 FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
-                        .getBroker(topResultFileSink.getBrokerName(), 
execBeAddr.getHostname());
+                        .getBroker(topResultFileSink.getBrokerName(),
+                                
topParams.instanceExecParams.get(0).host.getHostname());
                 topResultFileSink.setBrokerAddr(broker.host, broker.port);
             }
         } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index d718089fcab..a9d6becc7fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -52,6 +52,7 @@ import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
 import org.apache.doris.qe.runtime.ThriftPlansBuilder;
 import org.apache.doris.resource.workloadgroup.QueryQueue;
 import org.apache.doris.resource.workloadgroup.QueueToken;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -90,7 +91,7 @@ public class NereidsCoordinator extends Coordinator {
         
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));
 
         Preconditions.checkState(!planner.getFragments().isEmpty()
-                        && coordinatorContext.instanceNum.get() > 0, "Fragment 
and Instance can not be empty˚");
+                && coordinatorContext.instanceNum.get() > 0, "Fragment and 
Instance can not be empty˚");
     }
 
     // broker load
@@ -431,18 +432,22 @@ public class NereidsCoordinator extends Coordinator {
         if (dataSink instanceof ResultSink || dataSink instanceof 
ResultFileSink) {
             if (connectContext != null && 
!connectContext.isReturnResultFromLocal()) {
                 
Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
-
-                AssignedJob firstInstance = topPlan.getInstanceJobs().get(0);
-                BackendWorker worker = (BackendWorker) 
firstInstance.getAssignedWorker();
-                Backend backend = worker.getBackend();
-
-                connectContext.setFinstId(firstInstance.instanceId());
-                if (backend.getArrowFlightSqlPort() < 0) {
-                    throw new IllegalStateException("be arrow_flight_sql_port 
cannot be empty.");
+                for (AssignedJob instance : topPlan.getInstanceJobs()) {
+                    BackendWorker worker = (BackendWorker) 
instance.getAssignedWorker();
+                    Backend backend = worker.getBackend();
+                    if (backend.getArrowFlightSqlPort() < 0) {
+                        throw new IllegalStateException("be 
arrow_flight_sql_port cannot be empty.");
+                    }
+                    TUniqueId finstId;
+                    if 
(connectContext.getSessionVariable().enableParallelResultSink()) {
+                        finstId = getQueryId();
+                    } else {
+                        finstId = instance.instanceId();
+                    }
+                    connectContext.addFlightSqlEndpointsLocation(new 
FlightSqlEndpointsLocation(finstId,
+                            backend.getArrowFlightAddress(), 
backend.getBrpcAddress(),
+                            
topPlan.getFragmentJob().getFragment().getOutputExprs()));
                 }
-                
connectContext.setResultFlightServerAddr(backend.getArrowFlightAddress());
-                
connectContext.setResultInternalServiceAddr(backend.getBrpcAddress());
-                
connectContext.setResultOutputExprs(topPlan.getFragmentJob().getFragment().getOutputExprs());
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 9d44a55b081..b968ab04c57 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -25,11 +25,13 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
 import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -187,6 +189,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             Preconditions.checkState(!query.isEmpty());
             // After the previous query was executed, there was no 
getStreamStatement to take away the result.
             connectContext.getFlightSqlChannel().reset();
+            connectContext.clearFlightSqlEndpointsLocations();
             try (FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext)) {
                 flightSQLConnectProcessor.handleQuery(query);
                 if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
@@ -225,50 +228,52 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                     }
                 } else {
                     // Now only query stmt will pull results from BE.
-                    Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
-                    if (schema == null) {
+                    flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+                    if (flightSQLConnectProcessor.getArrowSchema() == null) {
                         throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null")
                                 .toRuntimeException();
                     }
 
-                    TUniqueId queryId = connectContext.queryId();
-                    if 
(!connectContext.getSessionVariable().enableParallelResultSink()) {
-                        // only one instance
-                        queryId = connectContext.getFinstId();
-                    }
-                    // Ticket contains the IP and Brpc Port of the Doris BE 
node where the query result is located.
-                    final ByteString handle = ByteString.copyFromUtf8(
-                            DebugUtil.printId(queryId) + "&" + 
connectContext.getResultInternalServiceAddr().hostname
-                                    + "&" + 
connectContext.getResultInternalServiceAddr().port + "&" + query);
-                    TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                            .build();
-                    Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
-                    // TODO Support multiple endpoints.
-                    Location location;
-                    if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
-                        // In a production environment, it is often 
inconvenient to expose Doris BE nodes
-                        // to the external network.
-                        // However, a reverse proxy (such as nginx) can be 
added to all Doris BE nodes,
-                        // and the external client will be randomly routed to 
a Doris BE node when connecting to nginx.
-                        // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
-                        // If it is different from the Doris BE node randomly 
routed by nginx,
-                        // data forwarding needs to be done inside the Doris 
BE node.
-                        if 
(flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
-                            location = Location.forGrpcInsecure(
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().port);
+                    List<FlightEndpoint> endpoints = Lists.newArrayList();
+                    for (FlightSqlEndpointsLocation endpointLoc : 
connectContext.getFlightSqlEndpointsLocations()) {
+                        TUniqueId tid = endpointLoc.getFinstId();
+                        // Ticket contains the IP and Brpc Port of the Doris 
BE node where the query result is located.
+                        final ByteString handle = ByteString.copyFromUtf8(
+                                DebugUtil.printId(tid) + "&" + 
endpointLoc.getResultInternalServiceAddr().hostname + "&"
+                                        + 
endpointLoc.getResultInternalServiceAddr().port + "&" + query);
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
+                        Location location;
+                        if 
(endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
+                            // In a production environment, it is often 
inconvenient to expose Doris BE nodes
+                            // to the external network.
+                            // However, a reverse proxy (such as nginx) can be 
added to all Doris BE nodes,
+                            // and the external client will be randomly routed 
to a Doris BE node when connecting
+                            // to nginx.
+                            // The query results of Arrow Flight SQL will be 
randomly saved on a Doris BE node.
+                            // If it is different from the Doris BE node 
randomly routed by nginx,
+                            // data forwarding needs to be done inside the 
Doris BE node.
+                            if 
(endpointLoc.getResultPublicAccessAddr().isSetPort()) {
+                                location = 
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+                                        
endpointLoc.getResultPublicAccessAddr().port);
+                            } else {
+                                location = 
Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
+                                        
endpointLoc.getResultFlightServerAddr().port);
+                            }
                         } else {
-                            location = Location.forGrpcInsecure(
-                                    
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-                                    
connectContext.getResultFlightServerAddr().port);
+                            location = 
Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
+                                    
endpointLoc.getResultFlightServerAddr().port);
                         }
-                    } else {
-                        location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-                                
connectContext.getResultFlightServerAddr().port);
+                        // By default, the query results of all BE nodes will 
be aggregated to one BE node.
+                        // ADBC Client will only receive one endpoint and pull 
data from the BE node
+                        // corresponding to this endpoint.
+                        // `set global enable_parallel_result_sink=true;` to 
allow each BE to return query results
+                        // separately. ADBC Client will receive multiple 
endpoints and pull data from each endpoint.
+                        endpoints.add(new FlightEndpoint(ticket, location));
                     }
-                    List<FlightEndpoint> endpoints = 
Collections.singletonList(new FlightEndpoint(ticket, location));
                     // TODO Set in BE callback after query end, Client will 
not callback.
-                    return new FlightInfo(schema, descriptor, endpoints, -1, 
-1);
+                    return new 
FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, 
-1, -1);
                 }
             }
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index db5213cb7d4..3fba602a1c1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
@@ -58,7 +59,7 @@ import java.util.concurrent.TimeoutException;
  */
 public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
-    private TNetworkAddress publicAccessAddr = new TNetworkAddress();
+    private Schema arrowSchema;
 
     public FlightSqlConnectProcessor(ConnectContext context) {
         super(context);
@@ -67,8 +68,8 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
         context.setReturnResultFromLocal(true);
     }
 
-    public TNetworkAddress getPublicAccessAddr() {
-        return publicAccessAddr;
+    public Schema getArrowSchema() {
+        return arrowSchema;
     }
 
     public void prepare(MysqlCommand command) {
@@ -107,80 +108,87 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
     //     handleFieldList(tableName);
     // }
 
-    public Schema fetchArrowFlightSchema(int timeoutMs) {
-        TNetworkAddress address = ctx.getResultInternalServiceAddr();
-        TUniqueId tid;
-        if (ctx.getSessionVariable().enableParallelResultSink()) {
-            tid = ctx.queryId();
-        } else {
-            // only one instance
-            tid = ctx.getFinstId();
+    public void fetchArrowFlightSchema(int timeoutMs) {
+        if (ctx.getFlightSqlEndpointsLocations().isEmpty()) {
+            throw new RuntimeException("fetch arrow flight schema failed, no 
FlightSqlEndpointsLocations.");
         }
-        ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
-        Types.PUniqueId queryId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
-        try {
-            InternalService.PFetchArrowFlightSchemaRequest request =
-                    InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
-                            .setFinstId(queryId)
-                            .build();
-
-            Future<InternalService.PFetchArrowFlightSchemaResult> future
-                    = 
BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
-            InternalService.PFetchArrowFlightSchemaResult pResult;
-            pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
-            if (pResult == null) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema timeout, queryId: %s",
-                        DebugUtil.printId(tid)));
-            }
-            Status resultStatus = new Status(pResult.getStatus());
-            if (resultStatus.getErrorCode() != TStatusCode.OK) {
-                throw new RuntimeException(String.format("fetch arrow flight 
schema failed, queryId: %s, errmsg: %s",
-                        DebugUtil.printId(tid), resultStatus));
-            }
-            if (pResult.hasBeArrowFlightIp()) {
-                
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
-            }
-            if (pResult.hasBeArrowFlightPort()) {
-                publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
-            }
-            if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
-                RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);
-                ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
-                        new 
ByteArrayInputStream(pResult.getSchema().toByteArray()),
-                        rootAllocator
-                );
-                try {
-                    VectorSchemaRoot root = 
arrowStreamReader.getVectorSchemaRoot();
-                    List<FieldVector> fieldVectors = root.getFieldVectors();
-                    if (fieldVectors.size() != resultOutputExprs.size()) {
-                        throw new RuntimeException(String.format(
-                                "Schema size %s' is not equal to arrow field 
size %s, queryId: %s.",
-                                fieldVectors.size(), resultOutputExprs.size(), 
DebugUtil.printId(tid)));
+        for (FlightSqlEndpointsLocation endpointLoc : 
ctx.getFlightSqlEndpointsLocations()) {
+            TNetworkAddress address = 
endpointLoc.getResultInternalServiceAddr();
+            TUniqueId tid = endpointLoc.getFinstId();
+            ArrayList<Expr> resultOutputExprs = 
endpointLoc.getResultOutputExprs();
+            Types.PUniqueId queryId = 
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
+            try {
+                InternalService.PFetchArrowFlightSchemaRequest request
+                        = 
InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build();
+
+                Future<InternalService.PFetchArrowFlightSchemaResult> future = 
BackendServiceProxy.getInstance()
+                        .fetchArrowFlightSchema(address, request);
+                InternalService.PFetchArrowFlightSchemaResult pResult;
+                pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
+                if (pResult == null) {
+                    throw new RuntimeException(
+                            String.format("fetch arrow flight schema timeout, 
queryId: %s", DebugUtil.printId(tid)));
+                }
+                Status resultStatus = new Status(pResult.getStatus());
+                if (resultStatus.getErrorCode() != TStatusCode.OK) {
+                    throw new RuntimeException(
+                            String.format("fetch arrow flight schema failed, 
queryId: %s, errmsg: %s",
+                                    DebugUtil.printId(tid), resultStatus));
+                }
+
+                TNetworkAddress resultPublicAccessAddr = new TNetworkAddress();
+                if (pResult.hasBeArrowFlightIp()) {
+                    
resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
+                }
+                if (pResult.hasBeArrowFlightPort()) {
+                    
resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort());
+                }
+                endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr);
+                if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
+                    RootAllocator rootAllocator = new 
RootAllocator(Integer.MAX_VALUE);
+                    ArrowStreamReader arrowStreamReader = new 
ArrowStreamReader(
+                            new 
ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator);
+                    try {
+                        Schema schema;
+                        VectorSchemaRoot root = 
arrowStreamReader.getVectorSchemaRoot();
+                        List<FieldVector> fieldVectors = 
root.getFieldVectors();
+                        if (fieldVectors.size() != resultOutputExprs.size()) {
+                            throw new RuntimeException(
+                                    String.format("Schema size %s' is not 
equal to arrow field size %s, queryId: %s.",
+                                            fieldVectors.size(), 
resultOutputExprs.size(), DebugUtil.printId(tid)));
+                        }
+                        schema = root.getSchema();
+                        if (arrowSchema == null) {
+                            arrowSchema = schema;
+                        } else if (!arrowSchema.equals(schema)) {
+                            throw new RuntimeException(String.format(
+                                    "The schema returned by results BE is 
different, first schema: %s, "
+                                            + "new schema: %s, queryId: 
%s,backend: %s", arrowSchema, schema,
+                                    DebugUtil.printId(tid), address));
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException("Read Arrow Flight Schema 
failed.", e);
                     }
-                    return root.getSchema();
-                } catch (Exception e) {
-                    throw new RuntimeException("Read Arrow Flight Schema 
failed.", e);
+                } else {
+                    throw new RuntimeException(
+                            String.format("get empty arrow flight schema, 
queryId: %s", DebugUtil.printId(tid)));
                 }
-            } else {
-                throw new RuntimeException(String.format("get empty arrow 
flight schema, queryId: %s",
-                        DebugUtil.printId(tid)));
+            } catch (RpcException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema fetch catch rpc 
exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema future get 
interrupted exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (ExecutionException e) {
+                throw new RuntimeException(
+                        String.format("arrow flight schema future get 
execution exception, queryId: %s,backend: %s",
+                                DebugUtil.printId(tid), address), e);
+            } catch (TimeoutException e) {
+                throw new RuntimeException(String.format("arrow flight schema 
fetch timeout, queryId: %s,backend: %s",
+                        DebugUtil.printId(tid), address), e);
             }
-        } catch (RpcException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema fetch catch rpc exception, queryId: 
%s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema future get interrupted exception, 
queryId: %s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (ExecutionException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema future get execution exception, 
queryId: %s,backend: %s",
-                    DebugUtil.printId(tid), address), e);
-        } catch (TimeoutException e) {
-            throw new RuntimeException(String.format(
-                    "arrow flight schema fetch timeout, queryId: %s,backend: 
%s",
-                    DebugUtil.printId(tid), address), e);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
new file mode 100644
index 00000000000..61adc797cc5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.service.arrowflight.results;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TUniqueId;
+
+import java.util.ArrayList;
+
+public class FlightSqlEndpointsLocation {
+    private TUniqueId finstId;
+    private TNetworkAddress resultFlightServerAddr;
+    private TNetworkAddress resultInternalServiceAddr;
+    private TNetworkAddress resultPublicAccessAddr;
+    private ArrayList<Expr> resultOutputExprs;
+
+    public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress 
resultFlightServerAddr,
+            TNetworkAddress resultInternalServiceAddr, ArrayList<Expr> 
resultOutputExprs) {
+        this.finstId = finstId;
+        this.resultFlightServerAddr = resultFlightServerAddr;
+        this.resultInternalServiceAddr = resultInternalServiceAddr;
+        this.resultPublicAccessAddr = new TNetworkAddress();
+        this.resultOutputExprs = resultOutputExprs;
+    }
+
+    public TUniqueId getFinstId() {
+        return finstId;
+    }
+
+    public TNetworkAddress getResultFlightServerAddr() {
+        return resultFlightServerAddr;
+    }
+
+    public TNetworkAddress getResultInternalServiceAddr() {
+        return resultInternalServiceAddr;
+    }
+
+    public void setResultPublicAccessAddr(TNetworkAddress 
resultPublicAccessAddr) {
+        this.resultPublicAccessAddr = resultPublicAccessAddr;
+    }
+
+    public TNetworkAddress getResultPublicAccessAddr() {
+        return resultPublicAccessAddr;
+    }
+
+    public ArrayList<Expr> getResultOutputExprs() {
+        return resultOutputExprs;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to