This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 05b48d645bd [fix](arrow-flight-sql) Arrow Flight support multiple endpoints (#44286) 05b48d645bd is described below commit 05b48d645bda0b928df4c1e9dd42d0fa0112bc03 Author: Xinyi Zou <zouxi...@selectdb.com> AuthorDate: Mon Nov 25 10:39:35 2024 +0800 [fix](arrow-flight-sql) Arrow Flight support multiple endpoints (#44286) ### What problem does this PR solve? Problem Summary: By default, the query results of all BE nodes will be aggregated to one BE node. ADBC Client will only receive one endpoint and pull data from the BE node corresponding to this endpoint. `set global enable_parallel_result_sink=true;` to allow each BE to return query results separately. ADBC Client will receive multiple endpoints and pull data from each endpoint. --- .../java/org/apache/doris/qe/ConnectContext.java | 40 ++---- .../main/java/org/apache/doris/qe/Coordinator.java | 32 ++--- .../org/apache/doris/qe/NereidsCoordinator.java | 29 ++-- .../arrowflight/DorisFlightSqlProducer.java | 77 ++++++----- .../arrowflight/FlightSqlConnectProcessor.java | 154 +++++++++++---------- .../results/FlightSqlEndpointsLocation.java | 65 +++++++++ 6 files changed, 228 insertions(+), 169 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 5e0716da7d7..a16422ba9e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -19,7 +19,6 @@ package org.apache.doris.qe; import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DecimalLiteral; -import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FloatLiteral; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.LiteralExpr; @@ -63,11 +62,11 @@ import org.apache.doris.plsql.executor.PlSqlOperation; import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.resource.Tag; import org.apache.doris.service.arrowflight.results.FlightSqlChannel; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.Histogram; import org.apache.doris.system.Backend; import org.apache.doris.task.LoadTaskInfo; -import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultSinkType; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -134,10 +133,7 @@ public class ConnectContext { protected volatile String peerIdentity; private final Map<String, String> preparedQuerys = new HashMap<>(); private String runningQuery; - private TNetworkAddress resultFlightServerAddr; - private TNetworkAddress resultInternalServiceAddr; - private ArrayList<Expr> resultOutputExprs; - private TUniqueId finstId; + private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList(); private boolean returnResultFromLocal = true; // mysql net protected volatile MysqlChannel mysqlChannel; @@ -730,36 +726,16 @@ public class ConnectContext { return runningQuery; } - public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) { - this.resultFlightServerAddr = resultFlightServerAddr; + public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) { + this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation); } - public TNetworkAddress getResultFlightServerAddr() { - return resultFlightServerAddr; + public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() { + return flightSqlEndpointsLocations; } - public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) { - this.resultInternalServiceAddr = resultInternalServiceAddr; - } - - public TNetworkAddress getResultInternalServiceAddr() { - return resultInternalServiceAddr; - } - - public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) { - this.resultOutputExprs = resultOutputExprs; - } - - public ArrayList<Expr> getResultOutputExprs() { - return resultOutputExprs; - } - - public void setFinstId(TUniqueId finstId) { - this.finstId = finstId; - } - - public TUniqueId getFinstId() { - return finstId; + public void clearFlightSqlEndpointsLocations() { + flightSqlEndpointsLocations.clear(); } public void setReturnResultFromLocal(boolean returnResultFromLocal) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index dee130886ec..acd0fbe0dae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -83,6 +83,7 @@ import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadEtlTask; @@ -733,29 +734,27 @@ public class Coordinator implements CoordInterface { enableParallelResultSink = queryOptions.isEnableParallelOutfile(); } - TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; Set<TNetworkAddress> addrs = new HashSet<>(); for (FInstanceExecParam param : topParams.instanceExecParams) { if (addrs.contains(param.host)) { continue; } addrs.add(param.host); - receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), - toBrpcHost(param.host), this.timeoutDeadline, - context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); - } - - if (!context.isReturnResultFromLocal()) { - Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); - if (enableParallelResultSink) { - context.setFinstId(queryId); + if (context.isReturnResultFromLocal()) { + receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), + toBrpcHost(param.host), this.timeoutDeadline, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); } else { - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); + Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); + TUniqueId finstId; + if (enableParallelResultSink) { + finstId = queryId; + } else { + finstId = topParams.instanceExecParams.get(0).instanceId; + } + context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId, + toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs())); } - context.setFinstId(topParams.instanceExecParams.get(0).instanceId); - context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); - context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); - context.setResultOutputExprs(fragments.get(0).getOutputExprs()); } LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId), @@ -766,7 +765,8 @@ public class Coordinator implements CoordInterface { // set the broker address for OUTFILE sink ResultFileSink topResultFileSink = (ResultFileSink) topDataSink; FsBroker broker = Env.getCurrentEnv().getBrokerMgr() - .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname()); + .getBroker(topResultFileSink.getBrokerName(), + topParams.instanceExecParams.get(0).host.getHostname()); topResultFileSink.setBrokerAddr(broker.host, broker.port); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java index d718089fcab..a9d6becc7fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java @@ -52,6 +52,7 @@ import org.apache.doris.qe.runtime.SingleFragmentPipelineTask; import org.apache.doris.qe.runtime.ThriftPlansBuilder; import org.apache.doris.resource.workloadgroup.QueryQueue; import org.apache.doris.resource.workloadgroup.QueueToken; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TNetworkAddress; @@ -90,7 +91,7 @@ public class NereidsCoordinator extends Coordinator { this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext)); Preconditions.checkState(!planner.getFragments().isEmpty() - && coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚"); + && coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚"); } // broker load @@ -431,18 +432,22 @@ public class NereidsCoordinator extends Coordinator { if (dataSink instanceof ResultSink || dataSink instanceof ResultFileSink) { if (connectContext != null && !connectContext.isReturnResultFromLocal()) { Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); - - AssignedJob firstInstance = topPlan.getInstanceJobs().get(0); - BackendWorker worker = (BackendWorker) firstInstance.getAssignedWorker(); - Backend backend = worker.getBackend(); - - connectContext.setFinstId(firstInstance.instanceId()); - if (backend.getArrowFlightSqlPort() < 0) { - throw new IllegalStateException("be arrow_flight_sql_port cannot be empty."); + for (AssignedJob instance : topPlan.getInstanceJobs()) { + BackendWorker worker = (BackendWorker) instance.getAssignedWorker(); + Backend backend = worker.getBackend(); + if (backend.getArrowFlightSqlPort() < 0) { + throw new IllegalStateException("be arrow_flight_sql_port cannot be empty."); + } + TUniqueId finstId; + if (connectContext.getSessionVariable().enableParallelResultSink()) { + finstId = getQueryId(); + } else { + finstId = instance.instanceId(); + } + connectContext.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId, + backend.getArrowFlightAddress(), backend.getBrpcAddress(), + topPlan.getFragmentJob().getFragment().getOutputExprs())); } - connectContext.setResultFlightServerAddr(backend.getArrowFlightAddress()); - connectContext.setResultInternalServiceAddr(backend.getBrpcAddress()); - connectContext.setResultOutputExprs(topPlan.getFragmentJob().getFragment().getOutputExprs()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 9d44a55b081..b968ab04c57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -25,11 +25,13 @@ import org.apache.doris.common.util.Util; import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry; import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -187,6 +189,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable Preconditions.checkState(!query.isEmpty()); // After the previous query was executed, there was no getStreamStatement to take away the result. connectContext.getFlightSqlChannel().reset(); + connectContext.clearFlightSqlEndpointsLocations(); try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) { flightSQLConnectProcessor.handleQuery(query); if (connectContext.getState().getStateType() == MysqlStateType.ERR) { @@ -225,50 +228,52 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable } } else { // Now only query stmt will pull results from BE. - Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000); - if (schema == null) { + flightSQLConnectProcessor.fetchArrowFlightSchema(5000); + if (flightSQLConnectProcessor.getArrowSchema() == null) { throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null") .toRuntimeException(); } - TUniqueId queryId = connectContext.queryId(); - if (!connectContext.getSessionVariable().enableParallelResultSink()) { - // only one instance - queryId = connectContext.getFinstId(); - } - // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. - final ByteString handle = ByteString.copyFromUtf8( - DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname - + "&" + connectContext.getResultInternalServiceAddr().port + "&" + query); - TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle) - .build(); - Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); - // TODO Support multiple endpoints. - Location location; - if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) { - // In a production environment, it is often inconvenient to expose Doris BE nodes - // to the external network. - // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, - // and the external client will be randomly routed to a Doris BE node when connecting to nginx. - // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. - // If it is different from the Doris BE node randomly routed by nginx, - // data forwarding needs to be done inside the Doris BE node. - if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) { - location = Location.forGrpcInsecure( - flightSQLConnectProcessor.getPublicAccessAddr().hostname, - flightSQLConnectProcessor.getPublicAccessAddr().port); + List<FlightEndpoint> endpoints = Lists.newArrayList(); + for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) { + TUniqueId tid = endpointLoc.getFinstId(); + // Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located. + final ByteString handle = ByteString.copyFromUtf8( + DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&" + + endpointLoc.getResultInternalServiceAddr().port + "&" + query); + TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder() + .setStatementHandle(handle).build(); + Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray()); + Location location; + if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) { + // In a production environment, it is often inconvenient to expose Doris BE nodes + // to the external network. + // However, a reverse proxy (such as nginx) can be added to all Doris BE nodes, + // and the external client will be randomly routed to a Doris BE node when connecting + // to nginx. + // The query results of Arrow Flight SQL will be randomly saved on a Doris BE node. + // If it is different from the Doris BE node randomly routed by nginx, + // data forwarding needs to be done inside the Doris BE node. + if (endpointLoc.getResultPublicAccessAddr().isSetPort()) { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultPublicAccessAddr().port); + } else { + location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); + } } else { - location = Location.forGrpcInsecure( - flightSQLConnectProcessor.getPublicAccessAddr().hostname, - connectContext.getResultFlightServerAddr().port); + location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname, + endpointLoc.getResultFlightServerAddr().port); } - } else { - location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname, - connectContext.getResultFlightServerAddr().port); + // By default, the query results of all BE nodes will be aggregated to one BE node. + // ADBC Client will only receive one endpoint and pull data from the BE node + // corresponding to this endpoint. + // `set global enable_parallel_result_sink=true;` to allow each BE to return query results + // separately. ADBC Client will receive multiple endpoints and pull data from each endpoint. + endpoints.add(new FlightEndpoint(ticket, location)); } - List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. - return new FlightInfo(schema, descriptor, endpoints, -1, -1); + return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1); } } } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java index db5213cb7d4..3fba602a1c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java @@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TUniqueId; @@ -58,7 +59,7 @@ import java.util.concurrent.TimeoutException; */ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class); - private TNetworkAddress publicAccessAddr = new TNetworkAddress(); + private Schema arrowSchema; public FlightSqlConnectProcessor(ConnectContext context) { super(context); @@ -67,8 +68,8 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC context.setReturnResultFromLocal(true); } - public TNetworkAddress getPublicAccessAddr() { - return publicAccessAddr; + public Schema getArrowSchema() { + return arrowSchema; } public void prepare(MysqlCommand command) { @@ -107,80 +108,87 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC // handleFieldList(tableName); // } - public Schema fetchArrowFlightSchema(int timeoutMs) { - TNetworkAddress address = ctx.getResultInternalServiceAddr(); - TUniqueId tid; - if (ctx.getSessionVariable().enableParallelResultSink()) { - tid = ctx.queryId(); - } else { - // only one instance - tid = ctx.getFinstId(); + public void fetchArrowFlightSchema(int timeoutMs) { + if (ctx.getFlightSqlEndpointsLocations().isEmpty()) { + throw new RuntimeException("fetch arrow flight schema failed, no FlightSqlEndpointsLocations."); } - ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs(); - Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); - try { - InternalService.PFetchArrowFlightSchemaRequest request = - InternalService.PFetchArrowFlightSchemaRequest.newBuilder() - .setFinstId(queryId) - .build(); - - Future<InternalService.PFetchArrowFlightSchemaResult> future - = BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request); - InternalService.PFetchArrowFlightSchemaResult pResult; - pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); - if (pResult == null) { - throw new RuntimeException(String.format("fetch arrow flight schema timeout, queryId: %s", - DebugUtil.printId(tid))); - } - Status resultStatus = new Status(pResult.getStatus()); - if (resultStatus.getErrorCode() != TStatusCode.OK) { - throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", - DebugUtil.printId(tid), resultStatus)); - } - if (pResult.hasBeArrowFlightIp()) { - publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); - } - if (pResult.hasBeArrowFlightPort()) { - publicAccessAddr.setPort(pResult.getBeArrowFlightPort()); - } - if (pResult.hasSchema() && pResult.getSchema().size() > 0) { - RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); - ArrowStreamReader arrowStreamReader = new ArrowStreamReader( - new ByteArrayInputStream(pResult.getSchema().toByteArray()), - rootAllocator - ); - try { - VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); - List<FieldVector> fieldVectors = root.getFieldVectors(); - if (fieldVectors.size() != resultOutputExprs.size()) { - throw new RuntimeException(String.format( - "Schema size %s' is not equal to arrow field size %s, queryId: %s.", - fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + for (FlightSqlEndpointsLocation endpointLoc : ctx.getFlightSqlEndpointsLocations()) { + TNetworkAddress address = endpointLoc.getResultInternalServiceAddr(); + TUniqueId tid = endpointLoc.getFinstId(); + ArrayList<Expr> resultOutputExprs = endpointLoc.getResultOutputExprs(); + Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); + try { + InternalService.PFetchArrowFlightSchemaRequest request + = InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build(); + + Future<InternalService.PFetchArrowFlightSchemaResult> future = BackendServiceProxy.getInstance() + .fetchArrowFlightSchema(address, request); + InternalService.PFetchArrowFlightSchemaResult pResult; + pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS); + if (pResult == null) { + throw new RuntimeException( + String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid))); + } + Status resultStatus = new Status(pResult.getStatus()); + if (resultStatus.getErrorCode() != TStatusCode.OK) { + throw new RuntimeException( + String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s", + DebugUtil.printId(tid), resultStatus)); + } + + TNetworkAddress resultPublicAccessAddr = new TNetworkAddress(); + if (pResult.hasBeArrowFlightIp()) { + resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8()); + } + if (pResult.hasBeArrowFlightPort()) { + resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort()); + } + endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr); + if (pResult.hasSchema() && pResult.getSchema().size() > 0) { + RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); + ArrowStreamReader arrowStreamReader = new ArrowStreamReader( + new ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator); + try { + Schema schema; + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + List<FieldVector> fieldVectors = root.getFieldVectors(); + if (fieldVectors.size() != resultOutputExprs.size()) { + throw new RuntimeException( + String.format("Schema size %s' is not equal to arrow field size %s, queryId: %s.", + fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid))); + } + schema = root.getSchema(); + if (arrowSchema == null) { + arrowSchema = schema; + } else if (!arrowSchema.equals(schema)) { + throw new RuntimeException(String.format( + "The schema returned by results BE is different, first schema: %s, " + + "new schema: %s, queryId: %s,backend: %s", arrowSchema, schema, + DebugUtil.printId(tid), address)); + } + } catch (Exception e) { + throw new RuntimeException("Read Arrow Flight Schema failed.", e); } - return root.getSchema(); - } catch (Exception e) { - throw new RuntimeException("Read Arrow Flight Schema failed.", e); + } else { + throw new RuntimeException( + String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid))); } - } else { - throw new RuntimeException(String.format("get empty arrow flight schema, queryId: %s", - DebugUtil.printId(tid))); + } catch (RpcException e) { + throw new RuntimeException( + String.format("arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (InterruptedException e) { + throw new RuntimeException( + String.format("arrow flight schema future get interrupted exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (ExecutionException e) { + throw new RuntimeException( + String.format("arrow flight schema future get execution exception, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); + } catch (TimeoutException e) { + throw new RuntimeException(String.format("arrow flight schema fetch timeout, queryId: %s,backend: %s", + DebugUtil.printId(tid), address), e); } - } catch (RpcException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (InterruptedException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get interrupted exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (ExecutionException e) { - throw new RuntimeException(String.format( - "arrow flight schema future get execution exception, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); - } catch (TimeoutException e) { - throw new RuntimeException(String.format( - "arrow flight schema fetch timeout, queryId: %s,backend: %s", - DebugUtil.printId(tid), address), e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java new file mode 100644 index 00000000000..61adc797cc5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/results/FlightSqlEndpointsLocation.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.service.arrowflight.results; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TUniqueId; + +import java.util.ArrayList; + +public class FlightSqlEndpointsLocation { + private TUniqueId finstId; + private TNetworkAddress resultFlightServerAddr; + private TNetworkAddress resultInternalServiceAddr; + private TNetworkAddress resultPublicAccessAddr; + private ArrayList<Expr> resultOutputExprs; + + public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress resultFlightServerAddr, + TNetworkAddress resultInternalServiceAddr, ArrayList<Expr> resultOutputExprs) { + this.finstId = finstId; + this.resultFlightServerAddr = resultFlightServerAddr; + this.resultInternalServiceAddr = resultInternalServiceAddr; + this.resultPublicAccessAddr = new TNetworkAddress(); + this.resultOutputExprs = resultOutputExprs; + } + + public TUniqueId getFinstId() { + return finstId; + } + + public TNetworkAddress getResultFlightServerAddr() { + return resultFlightServerAddr; + } + + public TNetworkAddress getResultInternalServiceAddr() { + return resultInternalServiceAddr; + } + + public void setResultPublicAccessAddr(TNetworkAddress resultPublicAccessAddr) { + this.resultPublicAccessAddr = resultPublicAccessAddr; + } + + public TNetworkAddress getResultPublicAccessAddr() { + return resultPublicAccessAddr; + } + + public ArrayList<Expr> getResultOutputExprs() { + return resultOutputExprs; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org